From 2660fbaa21452c4ab06bde6d01d90ead886d496a Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Tue, 20 Oct 2015 17:59:29 +0300 Subject: [PATCH 01/23] dbms: Server: merged branch METR-16213 into master --- dbms/include/DB/Core/ErrorCodes.h | 2 + dbms/include/DB/Interpreters/Cluster.h | 80 +++--- .../DistributedBlockOutputStream.h | 12 +- .../DB/TableFunctions/TableFunctionRemote.h | 9 +- dbms/src/Interpreters/Cluster.cpp | 246 ++++++++++-------- dbms/src/Storages/StorageDistributed.cpp | 86 +++--- .../Storages/System/StorageSystemClusters.cpp | 6 +- 7 files changed, 250 insertions(+), 191 deletions(-) diff --git a/dbms/include/DB/Core/ErrorCodes.h b/dbms/include/DB/Core/ErrorCodes.h index c0aa6030748..a8445610850 100644 --- a/dbms/include/DB/Core/ErrorCodes.h +++ b/dbms/include/DB/Core/ErrorCodes.h @@ -297,6 +297,8 @@ namespace ErrorCodes MONGODB_INIT_FAILED = 293, INVALID_BLOCK_EXTRA_INFO = 294, RECEIVED_EMPTY_DATA = 295, + NO_REMOTE_SHARD_FOUND = 296, + SHARD_HAS_NO_CONNECTIONS = 297, KEEPER_EXCEPTION = 999, POCO_EXCEPTION = 1000, diff --git a/dbms/include/DB/Interpreters/Cluster.h b/dbms/include/DB/Interpreters/Cluster.h index 074b039d0ea..50f16137f0c 100644 --- a/dbms/include/DB/Interpreters/Cluster.h +++ b/dbms/include/DB/Interpreters/Cluster.h @@ -13,7 +13,7 @@ namespace DB /// С локальными узлами соединение не устанавливается, а выполяется запрос напрямую. /// Поэтому храним только количество локальных узлов /// В конфиге кластер включает в себя узлы или -class Cluster : private boost::noncopyable +class Cluster { public: Cluster(const Settings & settings, const String & cluster_name); @@ -22,28 +22,13 @@ public: Cluster(const Settings & settings, std::vector> names, const String & username, const String & password); - /// количество узлов clickhouse сервера, расположенных локально - /// к локальным узлам обращаемся напрямую - size_t getLocalNodesNum() const { return local_nodes_num; } + Cluster(const Cluster &) = delete; + Cluster & operator=(const Cluster &) = delete; /// используеться для выставления ограничения на размер таймаута static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit); public: - /// Соединения с удалёнными серверами. - ConnectionPools pools; - - struct ShardInfo - { - /// contains names of directories for asynchronous write to StorageDistributed - std::vector dir_names; - UInt32 shard_num; - int weight; - size_t num_local_nodes; - }; - std::vector shard_info_vec; - std::vector slot_to_shard; - struct Address { /** В конфиге адреса либо находятся в узлах : @@ -73,26 +58,59 @@ public: Address(const String & host_port_, const String & user_, const String & password_); }; -private: - static bool isLocal(const Address & address); + using Addresses = std::vector
; + using AddressesWithFailover = std::vector; + + struct ShardInfo + { + public: + bool isLocal() const { return !local_addresses.empty(); } + bool hasRemoteConnections() const { return !pool.isNull(); } + size_t getLocalNodeCount() const { return local_addresses.size(); } + + public: + /// contains names of directories for asynchronous write to StorageDistributed + std::vector dir_names; + UInt32 shard_num; + int weight; + Addresses local_addresses; + mutable ConnectionPoolPtr pool; + }; + + using ShardsInfo = std::vector; public: - /// Массив шардов. Каждый шард - адреса одного сервера. - typedef std::vector
Addresses; + const ShardsInfo & getShardsInfo() const { return shards_info; } + const Addresses & getShardsAddresses() const { return addresses; } + const AddressesWithFailover & getShardsWithFailoverAddresses() const { return addresses_with_failover; } - /// Массив шардов. Для каждого шарда - массив адресов реплик (серверов, считающихся идентичными). - typedef std::vector AddressesWithFailover; + const ShardInfo * getAnyRemoteShardInfo() const { return any_remote_shard_info; } - const Addresses & getShardsInfo() const { return addresses; } - const AddressesWithFailover & getShardsWithFailoverInfo() const { return addresses_with_failover; } - const Addresses & getLocalShardsInfo() const { return local_addresses; } + /// Количество удалённых шардов. + size_t getRemoteShardCount() const { return remote_shard_count; } + + /// Количество узлов clickhouse сервера, расположенных локально + /// к локальным узлам обращаемся напрямую. + size_t getLocalShardCount() const { return local_shard_count; } + +public: + std::vector slot_to_shard; private: - Addresses addresses; - AddressesWithFailover addresses_with_failover; - Addresses local_addresses; + void initMisc(); - size_t local_nodes_num = 0; +private: + /// Описание шардов кластера. + ShardsInfo shards_info; + /// Любой удалённый шард. + ShardInfo * any_remote_shard_info = nullptr; + /// Массив шардов. Каждый шард - адреса одного сервера. + Addresses addresses; + /// Массив шардов. Для каждого шарда - массив адресов реплик (серверов, считающихся идентичными). + AddressesWithFailover addresses_with_failover; + + size_t remote_shard_count = 0; + size_t local_shard_count = 0; }; struct Clusters diff --git a/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h b/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h index 318316242ce..38601adf9b4 100644 --- a/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h +++ b/dbms/include/DB/Storages/Distributed/DistributedBlockOutputStream.h @@ -39,7 +39,7 @@ public: void write(const Block & block) override { - if (storage.getShardingKeyExpr() && storage.cluster.shard_info_vec.size() > 1) + if (storage.getShardingKeyExpr() && (storage.cluster.getShardsInfo().size() > 1)) return writeSplit(block); writeImpl(block); @@ -50,7 +50,7 @@ private: static std::vector createFiltersImpl(const size_t num_rows, const IColumn * column, const Cluster & cluster) { const auto total_weight = cluster.slot_to_shard.size(); - const auto num_shards = cluster.shard_info_vec.size(); + const auto num_shards = cluster.getShardsInfo().size(); std::vector filters(num_shards); /** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток. @@ -123,7 +123,7 @@ private: auto filters = createFilters(block); - const auto num_shards = storage.cluster.shard_info_vec.size(); + const auto num_shards = storage.cluster.getShardsInfo().size(); for (size_t i = 0; i < num_shards; ++i) { auto target_block = block.cloneEmpty(); @@ -138,9 +138,9 @@ private: void writeImpl(const Block & block, const size_t shard_id = 0) { - const auto & shard_info = storage.cluster.shard_info_vec[shard_id]; - if (shard_info.num_local_nodes) - writeToLocal(block, shard_info.num_local_nodes); + const auto & shard_info = storage.cluster.getShardsInfo()[shard_id]; + if (shard_info.getLocalNodeCount() > 0) + writeToLocal(block, shard_info.getLocalNodeCount()); /// dir_names is empty if shard has only local addresses if (!shard_info.dir_names.empty()) diff --git a/dbms/include/DB/TableFunctions/TableFunctionRemote.h b/dbms/include/DB/TableFunctions/TableFunctionRemote.h index 894fc4d5cc0..e454ad6df6f 100644 --- a/dbms/include/DB/TableFunctions/TableFunctionRemote.h +++ b/dbms/include/DB/TableFunctions/TableFunctionRemote.h @@ -132,10 +132,15 @@ private: Settings settings = context.getSettings(); NamesAndTypesList res; - /// Отправляем на первый попавшийся шард + /// Отправляем на первый попавшийся удалённый шард. + const auto shard_info = cluster.getAnyRemoteShardInfo(); + if (shard_info == nullptr) + throw Exception("No remote shard found", ErrorCodes::NO_REMOTE_SHARD_FOUND); + ConnectionPoolPtr pool = shard_info->pool; + BlockInputStreamPtr input{ new RemoteBlockInputStream{ - cluster.pools.front().get(), query, &settings, nullptr, + pool.get(), query, &settings, nullptr, Tables(), QueryProcessingStage::Complete, context} }; input->readPrefix(); diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index 025780f1ae7..a4704059a28 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -8,34 +8,67 @@ namespace DB { +namespace +{ + +/// Вес шарда по-умолчанию. +static constexpr int default_weight = 1; + +inline bool isLocal(const Cluster::Address & address) +{ + /// Если среди реплик существует такая, что: + /// - её порт совпадает с портом, который слушает сервер; + /// - её хост резолвится в набор адресов, один из которых совпадает с одним из адресов сетевых интерфейсов сервера + /// то нужно всегда ходить на этот шард без межпроцессного взаимодействия + return isLocalAddress(address.resolved_address); +} + +inline std::string addressToDirName(const Cluster::Address & address) +{ + return + escapeForFileName(address.user) + + (address.password.empty() ? "" : (':' + escapeForFileName(address.password))) + '@' + + escapeForFileName(address.resolved_address.host().toString()) + ':' + + std::to_string(address.resolved_address.port()); +} + +inline bool beginsWith(const std::string & str1, const char * str2) +{ + if (str2 == nullptr) + throw Exception("Passed null pointer to function beginsWith", ErrorCodes::LOGICAL_ERROR); + return 0 == strncmp(str1.data(), str2, strlen(str2)); +} /// Для кэширования DNS запросов. -static Poco::Net::SocketAddress resolveSocketAddressImpl1(const String & host, UInt16 port) +Poco::Net::SocketAddress resolveSocketAddressImpl1(const String & host, UInt16 port) { return Poco::Net::SocketAddress(host, port); } -static Poco::Net::SocketAddress resolveSocketAddressImpl2(const String & host_and_port) +Poco::Net::SocketAddress resolveSocketAddressImpl2(const String & host_and_port) { return Poco::Net::SocketAddress(host_and_port); } -static Poco::Net::SocketAddress resolveSocketAddress(const String & host, UInt16 port) +Poco::Net::SocketAddress resolveSocketAddress(const String & host, UInt16 port) { static SimpleCache cache; return cache(host, port); } -static Poco::Net::SocketAddress resolveSocketAddress(const String & host_and_port) +Poco::Net::SocketAddress resolveSocketAddress(const String & host_and_port) { static SimpleCache cache; return cache(host_and_port); } +} + +/// Реализация класса Cluster::Address Cluster::Address::Address(const String & config_prefix) { - auto & config = Poco::Util::Application::instance().config(); + const auto & config = Poco::Util::Application::instance().config(); host_name = config.getString(config_prefix + ".host"); port = config.getInt(config_prefix + ".port"); @@ -51,7 +84,7 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const UInt16 default_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0); /// Похоже на то, что строка host_port_ содержит порт. Если условие срабатывает - не обязательно значит, что порт есть (пример: [::]). - if (nullptr != strchr(host_port_.c_str(), ':') || !default_port) + if ((nullptr != strchr(host_port_.c_str(), ':')) || !default_port) { resolved_address = resolveSocketAddress(host_port_); host_name = host_port_.substr(0, host_port_.find(':')); @@ -65,19 +98,7 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const } } - -namespace -{ - inline std::string addressToDirName(const Cluster::Address & address) - { - return - escapeForFileName(address.user) + - (address.password.empty() ? "" : (':' + escapeForFileName(address.password))) + '@' + - escapeForFileName(address.resolved_address.host().toString()) + ':' + - std::to_string(address.resolved_address.port()); - } -} - +/// Реализация класса Clusters Clusters::Clusters(const Settings & settings, const String & config_name) { @@ -85,17 +106,16 @@ Clusters::Clusters(const Settings & settings, const String & config_name) Poco::Util::AbstractConfiguration::Keys config_keys; config.keys(config_name, config_keys); - for (Poco::Util::AbstractConfiguration::Keys::const_iterator it = config_keys.begin(); it != config_keys.end(); ++it) + for (const auto & key : config_keys) impl.emplace(std::piecewise_construct, - std::forward_as_tuple(*it), - std::forward_as_tuple(settings, config_name + "." + *it)); + std::forward_as_tuple(key), + std::forward_as_tuple(settings, config_name + "." + key)); } +/// Реализация класса Cluster Cluster::Cluster(const Settings & settings, const String & cluster_name) { - /// Создать кластер. - Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config(); Poco::Util::AbstractConfiguration::Keys config_keys; config.keys(cluster_name, config_keys); @@ -104,35 +124,56 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name) UInt32 current_shard_num = 1; - for (auto it = config_keys.begin(); it != config_keys.end(); ++it) + for (const auto & key : config_keys) { - if (0 == strncmp(it->c_str(), "node", strlen("node"))) + if (beginsWith(key, "node")) { - const auto & prefix = config_prefix + *it; - const auto weight = config.getInt(prefix + ".weight", 1); + /// Шард без реплик. + + const auto & prefix = config_prefix + key; + const auto weight = config.getInt(prefix + ".weight", default_weight); if (weight == 0) continue; addresses.emplace_back(prefix); addresses.back().replica_num = 1; + const auto & address = addresses.back(); - slot_to_shard.insert(std::end(slot_to_shard), weight, shard_info_vec.size()); - if (const auto is_local = isLocal(addresses.back())) - shard_info_vec.push_back({{}, current_shard_num, weight, is_local}); + ShardInfo info; + info.shard_num = current_shard_num; + info.weight = weight; + + if (isLocal(address)) + info.local_addresses.push_back(address); else - shard_info_vec.push_back({{addressToDirName(addresses.back())}, current_shard_num, weight, is_local}); + { + info.dir_names.push_back(addressToDirName(address)); + info.pool = new ConnectionPool( + settings.distributed_connections_pool_size, + address.host_name, address.port, address.resolved_address, + "", address.user, address.password, + "server", Protocol::Compression::Enable, + saturate(settings.connect_timeout, settings.limits.max_execution_time), + saturate(settings.receive_timeout, settings.limits.max_execution_time), + saturate(settings.send_timeout, settings.limits.max_execution_time)); + } + + slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size()); + shards_info.push_back(info); } - else if (0 == strncmp(it->c_str(), "shard", strlen("shard"))) + else if (beginsWith(key, "shard")) { + /// Шард с репликами. + Poco::Util::AbstractConfiguration::Keys replica_keys; - config.keys(config_prefix + *it, replica_keys); + config.keys(config_prefix + key, replica_keys); addresses_with_failover.emplace_back(); Addresses & replica_addresses = addresses_with_failover.back(); UInt32 current_replica_num = 1; - const auto & partial_prefix = config_prefix + *it + "."; - const auto weight = config.getInt(partial_prefix + ".weight", 1); + const auto & partial_prefix = config_prefix + key + "."; + const auto weight = config.getInt(partial_prefix + ".weight", default_weight); if (weight == 0) continue; @@ -142,26 +183,20 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name) * the first element of vector; otherwise we will just .emplace_back */ std::vector dir_names{}; - size_t num_local_nodes = 0; auto first = true; - for (auto jt = replica_keys.begin(); jt != replica_keys.end(); ++jt) + for (const auto & replica_key : replica_keys) { - if (0 == strncmp(jt->data(), "weight", strlen("weight")) || - 0 == strncmp(jt->data(), "internal_replication", strlen("internal_replication"))) + if (beginsWith(replica_key, "weight") || beginsWith(replica_key, "internal_replication")) continue; - if (0 == strncmp(jt->c_str(), "replica", strlen("replica"))) + if (beginsWith(replica_key, "replica")) { - replica_addresses.emplace_back(partial_prefix + *jt); + replica_addresses.emplace_back(partial_prefix + replica_key); replica_addresses.back().replica_num = current_replica_num; ++current_replica_num; - if (isLocal(replica_addresses.back())) - { - ++num_local_nodes; - } - else + if (!isLocal(replica_addresses.back())) { if (internal_replication) { @@ -178,40 +213,18 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name) } } else - throw Exception("Unknown element in config: " + *jt, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); + throw Exception("Unknown element in config: " + replica_key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); } - slot_to_shard.insert(std::end(slot_to_shard), weight, shard_info_vec.size()); - shard_info_vec.push_back({std::move(dir_names), current_shard_num, weight, num_local_nodes}); - } - else - throw Exception("Unknown element in config: " + *it, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); + Addresses shard_local_addresses; - ++current_shard_num; - } - - /// Создать соответствующие пулы соединений. - - if (!addresses_with_failover.empty() && !addresses.empty()) - throw Exception("There must be either 'node' or 'shard' elements in config", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); - - if (!addresses_with_failover.empty()) - { - for (const auto & shard : addresses_with_failover) - { ConnectionPools replicas; - replicas.reserve(shard.size()); + replicas.reserve(replica_addresses.size()); - bool has_local_replica = false; - - for (const auto & replica : shard) + for (const auto & replica : replica_addresses) { if (isLocal(replica)) - { - has_local_replica = true; - local_addresses.push_back(replica); - break; - } + shard_local_addresses.push_back(replica); else { replicas.emplace_back(new ConnectionPool( @@ -225,42 +238,31 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name) } } - if (has_local_replica) - ++local_nodes_num; - else - pools.emplace_back(new ConnectionPoolWithFailover(replicas, settings.load_balancing, settings.connections_with_failover_max_tries)); + ConnectionPoolPtr shard_pool; + if (!replicas.empty()) + shard_pool = new ConnectionPoolWithFailover(replicas, settings.load_balancing, settings.connections_with_failover_max_tries); + + slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size()); + shards_info.push_back({std::move(dir_names), current_shard_num, weight, shard_local_addresses, shard_pool}); } + else + throw Exception("Unknown element in config: " + key, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); + + if (!addresses_with_failover.empty() && !addresses.empty()) + throw Exception("There must be either 'node' or 'shard' elements in config", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + + ++current_shard_num; } - else if (!addresses.empty()) - { - for (const auto & address : addresses) - { - if (isLocal(address)) - { - local_addresses.push_back(address); - ++local_nodes_num; - } - else - { - pools.emplace_back(new ConnectionPool( - settings.distributed_connections_pool_size, - address.host_name, address.port, address.resolved_address, - "", address.user, address.password, - "server", Protocol::Compression::Enable, - saturate(settings.connect_timeout, settings.limits.max_execution_time), - saturate(settings.receive_timeout, settings.limits.max_execution_time), - saturate(settings.send_timeout, settings.limits.max_execution_time))); - } - } - } - else - throw Exception("No addresses listed in config", ErrorCodes::NO_ELEMENTS_IN_CONFIG); + + initMisc(); } Cluster::Cluster(const Settings & settings, std::vector> names, const String & username, const String & password) { + UInt32 current_shard_num = 1; + for (const auto & shard : names) { Addresses current; @@ -284,8 +286,14 @@ Cluster::Cluster(const Settings & settings, std::vector> nam saturate(settings.send_timeout, settings.limits.max_execution_time))); } - pools.emplace_back(new ConnectionPoolWithFailover(replicas, settings.load_balancing, settings.connections_with_failover_max_tries)); + ConnectionPoolPtr shard_pool = new ConnectionPoolWithFailover(replicas, settings.load_balancing, settings.connections_with_failover_max_tries); + + slot_to_shard.insert(std::end(slot_to_shard), default_weight, shards_info.size()); + shards_info.push_back({{}, current_shard_num, default_weight, {}, shard_pool}); + ++current_shard_num; } + + initMisc(); } @@ -294,17 +302,35 @@ Poco::Timespan Cluster::saturate(const Poco::Timespan & v, const Poco::Timespan if (limit.totalMicroseconds() == 0) return v; else - return v > limit ? limit : v; + return (v > limit) ? limit : v; } -bool Cluster::isLocal(const Address & address) +void Cluster::initMisc() { - /// Если среди реплик существует такая, что: - /// - её порт совпадает с портом, который слушает сервер; - /// - её хост резолвится в набор адресов, один из которых совпадает с одним из адресов сетевых интерфейсов сервера - /// то нужно всегда ходить на этот шард без межпроцессного взаимодействия - return isLocalAddress(address.resolved_address); + for (const auto & shard_info : shards_info) + { + if (!shard_info.isLocal() && !shard_info.hasRemoteConnections()) + throw Exception("Found shard without any specified connection", + ErrorCodes::SHARD_HAS_NO_CONNECTIONS); + } + + for (const auto & shard_info : shards_info) + { + if (shard_info.isLocal()) + ++local_shard_count; + else + ++remote_shard_count; + } + + for (auto & shard_info : shards_info) + { + if (!shard_info.isLocal()) + { + any_remote_shard_info = &shard_info; + break; + } + } } } diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 9de9b99ac7d..030a04dfae2 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -80,7 +80,7 @@ StorageDistributed::StorageDistributed( context(context_), cluster(cluster_), sharding_key_expr(sharding_key_ ? ExpressionAnalyzer(sharding_key_, context, nullptr, *columns).getActions(false) : nullptr), sharding_key_column_name(sharding_key_ ? sharding_key_->getColumnName() : String{}), - write_enabled(!data_path_.empty() && (cluster.getLocalNodesNum() + cluster.pools.size() < 2 || sharding_key_)), + write_enabled(!data_path_.empty() && (((cluster.getLocalShardCount() + cluster.getRemoteShardCount()) < 2) || sharding_key_)), path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(name) + '/')) { createDirectoryMonitors(); @@ -104,7 +104,7 @@ StorageDistributed::StorageDistributed( context(context_), cluster(cluster_), sharding_key_expr(sharding_key_ ? ExpressionAnalyzer(sharding_key_, context, nullptr, *columns).getActions(false) : nullptr), sharding_key_column_name(sharding_key_ ? sharding_key_->getColumnName() : String{}), - write_enabled(!data_path_.empty() && (cluster.getLocalNodesNum() + cluster.pools.size() < 2 || sharding_key_)), + write_enabled(!data_path_.empty() && (((cluster.getLocalShardCount() + cluster.getRemoteShardCount()) < 2) || sharding_key_)), path(data_path_.empty() ? "" : (data_path_ + escapeForFileName(name) + '/')) { createDirectoryMonitors(); @@ -168,7 +168,7 @@ BlockInputStreams StorageDistributed::read( /// Не имеет смысла на удалённых серверах, так как запрос отправляется обычно с другим user-ом. new_settings.max_concurrent_queries_for_user = 0; - size_t result_size = (cluster.pools.size() * settings.max_parallel_replicas) + cluster.getLocalNodesNum(); + size_t result_size = (cluster.getRemoteShardCount() * settings.max_parallel_replicas) + cluster.getLocalShardCount(); processed_stage = result_size == 1 || settings.distributed_group_by_no_merge ? QueryProcessingStage::Complete @@ -193,26 +193,31 @@ BlockInputStreams StorageDistributed::read( external_tables = context.getExternalTables(); /// Цикл по шардам. - for (auto & conn_pool : cluster.pools) - res.emplace_back(new RemoteBlockInputStream{ - conn_pool, modified_query, &new_settings, throttler, - external_tables, processed_stage, context}); - - /// Добавляем запросы к локальному ClickHouse. - if (cluster.getLocalNodesNum() > 0) + for (const auto & shard_info : cluster.getShardsInfo()) { - DB::Context new_context = context; - new_context.setSettings(new_settings); - - for (size_t i = 0; i < cluster.getLocalNodesNum(); ++i) + if (shard_info.isLocal()) { - InterpreterSelectQuery interpreter(modified_query_ast, new_context, processed_stage); + /// Добавляем запросы к локальному ClickHouse. - /** Материализация нужна, так как с удалённых серверов константы приходят материализованными. - * Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов, - * а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые. - */ - res.emplace_back(new MaterializingBlockInputStream(interpreter.execute().in)); + DB::Context new_context = context; + new_context.setSettings(new_settings); + + for (const auto & address : shard_info.local_addresses) + { + InterpreterSelectQuery interpreter(modified_query_ast, new_context, processed_stage); + + /** Материализация нужна, так как с удалённых серверов константы приходят материализованными. + * Если этого не делать, то в разных потоках будут получаться разные типы (Const и не-Const) столбцов, + * а это не разрешено, так как весь код исходит из допущения, что в потоке блоков все типы одинаковые. + */ + res.emplace_back(new MaterializingBlockInputStream(interpreter.execute().in)); + } + } + else + { + res.emplace_back(new RemoteBlockInputStream{ + shard_info.pool, modified_query, &new_settings, throttler, + external_tables, processed_stage, context}); } } @@ -274,26 +279,29 @@ BlockInputStreams StorageDistributed::describe(const Context & context, const Se BlockInputStreams res; /// Цикл по шардам. - for (auto & conn_pool : cluster.pools) + for (const auto & shard_info : cluster.getShardsInfo()) { - auto stream = new RemoteBlockInputStream{conn_pool, query, &new_settings, throttler}; - stream->reachAllReplicas(); - stream->appendExtraInfo(); - res.emplace_back(stream); - } - - /// Добавляем запросы к локальному ClickHouse. - if (cluster.getLocalNodesNum() > 0) - { - DB::Context new_context = context; - new_context.setSettings(new_settings); - - const auto & local_addresses = cluster.getLocalShardsInfo(); - for (const auto & address : local_addresses) + if (shard_info.isLocal()) { - InterpreterDescribeQuery interpreter(ast, new_context); - BlockInputStreamPtr stream = new MaterializingBlockInputStream(interpreter.execute().in); - stream = new BlockExtraInfoInputStream(stream, toBlockExtraInfo(address)); + /// Добавляем запросы к локальному ClickHouse. + + DB::Context new_context = context; + new_context.setSettings(new_settings); + + for (const auto & address : shard_info.local_addresses) + { + InterpreterDescribeQuery interpreter(ast, new_context); + BlockInputStreamPtr stream = new MaterializingBlockInputStream(interpreter.execute().in); + stream = new BlockExtraInfoInputStream(stream, toBlockExtraInfo(address)); + res.emplace_back(stream); + } + } + + if (shard_info.hasRemoteConnections()) + { + auto stream = new RemoteBlockInputStream{shard_info.pool, query, &new_settings, throttler}; + stream->reachAllReplicas(); + stream->appendExtraInfo(); res.emplace_back(stream); } } @@ -340,7 +348,7 @@ void StorageDistributed::requireDirectoryMonitor(const std::string & name) size_t StorageDistributed::getShardCount() const { - return cluster.pools.size(); + return cluster.getRemoteShardCount(); } } diff --git a/dbms/src/Storages/System/StorageSystemClusters.cpp b/dbms/src/Storages/System/StorageSystemClusters.cpp index b636e1274ee..43fee761450 100644 --- a/dbms/src/Storages/System/StorageSystemClusters.cpp +++ b/dbms/src/Storages/System/StorageSystemClusters.cpp @@ -72,9 +72,9 @@ BlockInputStreams StorageSystemClusters::read( { const std::string cluster_name = entry.first; const Cluster & cluster = entry.second; - const auto & addresses = cluster.getShardsInfo(); - const auto & addresses_with_failover = cluster.getShardsWithFailoverInfo(); - const auto & shards_info = cluster.shard_info_vec; + const auto & addresses = cluster.getShardsAddresses(); + const auto & addresses_with_failover = cluster.getShardsWithFailoverAddresses(); + const auto & shards_info = cluster.getShardsInfo(); if (!addresses.empty()) { From 61691234ca7370a1d09c9caa9ec86b6257ab28ec Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Tue, 20 Oct 2015 19:01:45 +0300 Subject: [PATCH 02/23] ext::map: use c++14 return type deduction --- libs/libcommon/include/ext/map.hpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/libs/libcommon/include/ext/map.hpp b/libs/libcommon/include/ext/map.hpp index 6a7941614ec..ac2fbe083f5 100644 --- a/libs/libcommon/include/ext/map.hpp +++ b/libs/libcommon/include/ext/map.hpp @@ -53,7 +53,6 @@ namespace ext * with each element transformed by the application of `mapper`. */ template