From ec0cee0afe29fe03fcb86c3390c4da7183815038 Mon Sep 17 00:00:00 2001 From: Andrey Mironov Date: Thu, 14 Aug 2014 15:50:36 +0400 Subject: [PATCH] escape user, password and host. [#METR-12221] --- dbms/include/DB/Interpreters/Cluster.h | 3 +- dbms/src/Interpreters/Cluster.cpp | 38 ++++++++++++++---------- dbms/src/Storages/StorageDistributed.cpp | 33 +++++++++++++++----- 3 files changed, 50 insertions(+), 24 deletions(-) diff --git a/dbms/include/DB/Interpreters/Cluster.h b/dbms/include/DB/Interpreters/Cluster.h index 5aca7670ae0..03eee47de71 100644 --- a/dbms/include/DB/Interpreters/Cluster.h +++ b/dbms/include/DB/Interpreters/Cluster.h @@ -33,7 +33,6 @@ public: { std::string dir_name; int weight; - bool internal_replication; }; std::vector shard_info_vec; std::vector slot_to_shard; @@ -67,7 +66,7 @@ public: Address(const String & host_port_, const String & user_, const String & password_); }; - static bool isLocal(const Address & address); + static bool addressIsLocal(const Poco::Net::SocketAddress & address); // private: /// Массив шардов. Каждый шард - адреса одного сервера. diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index 7e21f7aa99b..530001a24aa 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -38,8 +39,10 @@ namespace inline std::string addressToDirName(const Cluster::Address & address) { return - address.user + (address.password.empty() ? "" : (':' + address.password)) + '@' + - address.host_port.host().toString() + ':' + std::to_string(address.host_port.port()); + escapeForFileName(address.user) + + (address.password.empty() ? "" : (':' + escapeForFileName(address.password))) + '@' + + escapeForFileName(address.host_port.host().toString()) + ':' + + std::to_string(address.host_port.port()); } } @@ -72,11 +75,12 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa { const auto & prefix = config_prefix + *it; const auto weight = config.getInt(prefix + ".weight", 1); - const auto internal_replication = config.getBool(prefix + ".internal_replication", false); + if (weight == 0) + continue; addresses.emplace_back(prefix); slot_to_shard.insert(std::end(slot_to_shard), weight, shard_info_vec.size()); - shard_info_vec.push_back({addressToDirName(addresses.back()), weight, internal_replication}); + shard_info_vec.push_back({addressToDirName(addresses.back()), weight}); } else if (0 == strncmp(it->c_str(), "shard", strlen("shard"))) { @@ -88,14 +92,18 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa const auto & partial_prefix = config_prefix + *it + "."; const auto weight = config.getInt(partial_prefix + ".weight", 1); - const auto internal_replication = config.getBool(partial_prefix + ".internal_replication", false); + if (weight == 0) + continue; + + // const auto internal_replication = config.getBool(partial_prefix + ".internal_replication", false); std::string dir_name{}; auto first = true; for (auto jt = replica_keys.begin(); jt != replica_keys.end(); ++jt) { - if (0 == strncmp(jt->data(), "weight", strlen("weight"))) + if (0 == strncmp(jt->data(), "weight", strlen("weight")) || + 0 == strncmp(jt->data(), "internal_replication", strlen("internal_replication"))) continue; if (0 == strncmp(jt->c_str(), "replica", strlen("replica"))) @@ -111,7 +119,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa } slot_to_shard.insert(std::end(slot_to_shard), weight, shard_info_vec.size()); - shard_info_vec.push_back({dir_name, weight, internal_replication}); + shard_info_vec.push_back({dir_name, weight}); } else throw Exception("Unknown element in config: " + *it, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); @@ -130,7 +138,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa bool has_local_replics = false; for (Addresses::const_iterator jt = it->begin(); jt != it->end(); ++jt) { - if (isLocal(*jt)) + if (addressIsLocal(jt->host_port)) { has_local_replics = true; break; @@ -156,7 +164,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa { for (Addresses::const_iterator it = addresses.begin(); it != addresses.end(); ++it) { - if (isLocal(*it)) + if (addressIsLocal(it->host_port)) { ++local_nodes_num; } @@ -215,20 +223,20 @@ Poco::Timespan Cluster::saturate(const Poco::Timespan & v, const Poco::Timespan } -bool Cluster::isLocal(const Address & address) +bool Cluster::addressIsLocal(const Poco::Net::SocketAddress & address) { /// Если среди реплик существует такая, что: /// - её порт совпадает с портом, который слушает сервер; /// - её хост резолвится в набор адресов, один из которых совпадает с одним из адресов сетевых интерфейсов сервера /// то нужно всегда ходить на этот шард без межпроцессного взаимодействия - UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0); - static Poco::Net::NetworkInterface::NetworkInterfaceList interfaces = Poco::Net::NetworkInterface::list(); + const UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0); + static auto interfaces = Poco::Net::NetworkInterface::list(); - if (clickhouse_port == address.host_port.port() && + if (clickhouse_port == address.port() && interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(), - [&](const Poco::Net::NetworkInterface & interface) { return interface.address() == address.host_port.host(); })) + [&](const Poco::Net::NetworkInterface & interface) { return interface.address() == address.host(); })) { - LOG_INFO(&Poco::Util::Application::instance().logger(), "Replica with address " << address.host_port.toString() << " will be processed as local."); + LOG_INFO(&Poco::Util::Application::instance().logger(), "Replica with address " << address.toString() << " will be processed as local."); return true; } return false; diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 3c7c57cfb0a..6aa2fe89018 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -259,7 +259,9 @@ void StorageDistributed::directoryMonitorFunc(const std::string & name) const auto & path = this->path + name + '/'; std::cout << "created monitor for directory " << path << std::endl; - // ConnectionPools pools; + auto is_local = false; + ConnectionPools pools; + for (auto it = boost::make_split_iterator(name, boost::first_finder(",")); it != decltype(it){}; ++it) { const auto & address = boost::copy_range(*it); @@ -270,25 +272,42 @@ void StorageDistributed::directoryMonitorFunc(const std::string & name) throw Exception{"Shard address '" + address + "' does not match to 'user[:password]@host:port' pattern"}; const auto has_pw = colon < user_pw_end; - const auto host_end = has_pw ? colon : strchr(user_pw_end + 1, ':'); + const auto host_end = has_pw ? strchr(user_pw_end + 1, ':') : colon; if (!host_end) throw Exception{"Shard address '" + address + "' does not contain port"}; - const std::string user{address.data(), has_pw ? colon : user_pw_end}; - const auto password = has_pw ? std::string{colon + 1, user_pw_end} : std::string{}; - const std::string host{user_pw_end + 1, host_end}; + const auto user = unescapeForFileName({address.data(), has_pw ? colon : user_pw_end}); + const auto password = has_pw ? unescapeForFileName({colon + 1, user_pw_end}) : std::string{}; + const auto host = unescapeForFileName({user_pw_end + 1, host_end}); const auto port = DB::parse(host_end + 1); - // pools.emplace_back(new ConnectionPool(1, host, port, remote_database, "default", "", getName() + '_' + name)); std::cout << "\taddress " << host << " port " << port << " user " << user << " password " << password << std::endl; + + if (Cluster::addressIsLocal({host, port})) + { + is_local = true; + break; + } + + pools.emplace_back(new ConnectionPool{ + 1, host, port, "", + user, password, context.getDataTypeFactory(), + getName() + '_' + name + }); } - // auto pool = pools.size() == 1 ? pools[0] : new ConnectionPoolWithFailover(pools, DB::LoadBalancing::RANDOM); + std::cout << "local? " << std::boolalpha << is_local << std::endl; + const auto pool = is_local + ? (pools.size() == 1 + ? pools[0] + : new ConnectionPoolWithFailover(pools, DB::LoadBalancing::RANDOM) + ) + : nullptr; while (!quit.load(std::memory_order_relaxed)) {