diff --git a/dbms/include/DB/Interpreters/Cluster.h b/dbms/include/DB/Interpreters/Cluster.h index 06f1ed38c85..aabddd59d04 100644 --- a/dbms/include/DB/Interpreters/Cluster.h +++ b/dbms/include/DB/Interpreters/Cluster.h @@ -19,7 +19,7 @@ public: Cluster(const Settings & settings, const String & cluster_name); /// Построить кластер по именам шардов и реплик. Локальные обрабатываются так же как удаленные. - Cluster(const Settings & settings, std::vector> names, + Cluster(const Settings & settings, const std::vector> & names, const String & username, const String & password); Cluster(const Cluster &) = delete; @@ -31,19 +31,20 @@ public: public: struct Address { - /** В конфиге адреса либо находятся в узлах : + /** In configuration file, + * addresses are located either in elements: * * example01-01-1 * 9000 - * + * * * ... - * либо в узлах , и внутри - + * or in and inside in elements: * * * example01-01-1 * 9000 - * + * * * */ @@ -52,6 +53,7 @@ public: UInt16 port; String user; String password; + String default_database; /// this database is selected when no database is specified for Distributed table UInt32 replica_num; Address(const String & config_prefix); diff --git a/dbms/include/DB/Storages/System/StorageSystemClusters.h b/dbms/include/DB/Storages/System/StorageSystemClusters.h index f8fb726a762..e293425ebda 100644 --- a/dbms/include/DB/Storages/System/StorageSystemClusters.h +++ b/dbms/include/DB/Storages/System/StorageSystemClusters.h @@ -7,8 +7,9 @@ namespace DB class Context; -/** Реализует системную таблицу columns, которая позволяет получить информацию - * о столбцах каждой таблицы для всех баз данных. +/** Implements system table 'clusters' + * that allows to obtain information about available clusters + * (which may be specified in Distributed tables). */ class StorageSystemClusters : public IStorage { diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index fac51fa6340..d6658f0188a 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -22,16 +22,22 @@ namespace ErrorCodes namespace { -/// Вес шарда по-умолчанию. +/// Default shard weight. static constexpr int default_weight = 1; inline bool isLocal(const Cluster::Address & address) { - /// Если среди реплик существует такая, что: - /// - её порт совпадает с портом, который слушает сервер; - /// - её хост резолвится в набор адресов, один из которых совпадает с одним из адресов сетевых интерфейсов сервера - /// то нужно всегда ходить на этот шард без межпроцессного взаимодействия - return isLocalAddress(address.resolved_address); + /// If there is replica, for which: + /// - its port is the same that the server is listening; + /// - its host is resolved to set of addresses, one of which is the same as one of addresses of network interfaces of the server machine*; + /// then we must go to this shard without any inter-process communication. + /// + /// * - this criteria is somewhat approximate. + /// + /// Also, replica is considered non-local, if it has default database set + /// (only reason is to avoid query rewrite). + + return address.default_database.empty() && isLocalAddress(address.resolved_address); } inline std::string addressToDirName(const Cluster::Address & address) @@ -43,7 +49,7 @@ inline std::string addressToDirName(const Cluster::Address & address) std::to_string(address.resolved_address.port()); } -/// Для кэширования DNS запросов. +/// To cache DNS requests. Poco::Net::SocketAddress resolveSocketAddressImpl1(const String & host, UInt16 port) { return Poco::Net::SocketAddress(host, port); @@ -68,7 +74,7 @@ Poco::Net::SocketAddress resolveSocketAddress(const String & host_and_port) } -/// Реализация класса Cluster::Address +/// Implementation of Cluster::Address class Cluster::Address::Address(const String & config_prefix) { @@ -79,6 +85,7 @@ Cluster::Address::Address(const String & config_prefix) resolved_address = resolveSocketAddress(host_name, port); user = config.getString(config_prefix + ".user", "default"); password = config.getString(config_prefix + ".password", ""); + default_database = config.getString(config_prefix + ".default_database", ""); } @@ -87,7 +94,7 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const { UInt16 default_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0); - /// Похоже на то, что строка host_port_ содержит порт. Если условие срабатывает - не обязательно значит, что порт есть (пример: [::]). + /// It's like that 'host_port_' string contains port. If condition is met, it doesn't necessarily mean that port exists (example: [::]). if ((nullptr != strchr(host_port_.c_str(), ':')) || !default_port) { resolved_address = resolveSocketAddress(host_port_); @@ -102,7 +109,7 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const } } -/// Реализация класса Clusters +/// Implementation of Clusters class Clusters::Clusters(const Settings & settings, const String & config_name) { @@ -158,7 +165,7 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name) info.pool = std::make_shared( settings.distributed_connections_pool_size, address.host_name, address.port, address.resolved_address, - "", address.user, address.password, + address.default_database, address.user, address.password, "server", Protocol::Compression::Enable, saturate(settings.connect_timeout, settings.limits.max_execution_time), saturate(settings.receive_timeout, settings.limits.max_execution_time), @@ -237,7 +244,7 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name) replicas.emplace_back(std::make_shared( settings.distributed_connections_pool_size, replica.host_name, replica.port, replica.resolved_address, - "", replica.user, replica.password, + replica.default_database, replica.user, replica.password, "server", Protocol::Compression::Enable, saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time), saturate(settings.receive_timeout, settings.limits.max_execution_time), @@ -265,7 +272,7 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name) } -Cluster::Cluster(const Settings & settings, std::vector> names, +Cluster::Cluster(const Settings & settings, const std::vector> & names, const String & username, const String & password) { UInt32 current_shard_num = 1; @@ -286,7 +293,7 @@ Cluster::Cluster(const Settings & settings, std::vector> nam replicas.emplace_back(std::make_shared( settings.distributed_connections_pool_size, replica.host_name, replica.port, replica.resolved_address, - "", replica.user, replica.password, + replica.default_database, replica.user, replica.password, "server", Protocol::Compression::Enable, saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time), saturate(settings.receive_timeout, settings.limits.max_execution_time), diff --git a/dbms/src/Storages/StorageFactory.cpp b/dbms/src/Storages/StorageFactory.cpp index 8ab85afc4ff..add83057dec 100644 --- a/dbms/src/Storages/StorageFactory.cpp +++ b/dbms/src/Storages/StorageFactory.cpp @@ -360,8 +360,16 @@ StoragePtr StorageFactory::get( } else if (name == "Distributed") { - /** В запросе в качестве аргумента для движка указано имя конфигурационной секции, - * в которой задан список удалённых серверов, а также имя удалённой БД и имя удалённой таблицы. + /** Arguments of engine is following: + * - name of cluster in configuration; + * - name of remote database; + * - name of remote table; + * + * Remote database may be specified in following form: + * - identifier; + * - constant expression with string result, like currentDatabase(); + * -- string literal as specific case; + * - empty string means 'use default database from cluster'. */ ASTs & args_func = typeid_cast(*typeid_cast(*query).storage).children; @@ -380,7 +388,7 @@ StoragePtr StorageFactory::get( String cluster_name = getClusterName(*args[0]); String remote_database = reinterpretAsIdentifier(args[1], local_context).name; - String remote_table = typeid_cast(*args[2]).name; + String remote_table = reinterpretAsIdentifier(args[2], local_context).name; const auto & sharding_key = args.size() == 4 ? args[3] : nullptr; diff --git a/dbms/src/Storages/System/StorageSystemClusters.cpp b/dbms/src/Storages/System/StorageSystemClusters.cpp index ea0e9a71923..9941abf7ef8 100644 --- a/dbms/src/Storages/System/StorageSystemClusters.cpp +++ b/dbms/src/Storages/System/StorageSystemClusters.cpp @@ -22,7 +22,8 @@ StorageSystemClusters::StorageSystemClusters(const std::string & name_, Context { "host_address", std::make_shared() }, { "port", std::make_shared() }, { "is_local", std::make_shared() }, - { "user", std::make_shared() } + { "user", std::make_shared() }, + { "default_database", std::make_shared() } } , context(context_) { @@ -54,6 +55,7 @@ BlockInputStreams StorageSystemClusters::read( ColumnPtr port_column = std::make_shared(); ColumnPtr is_local_column = std::make_shared(); ColumnPtr user_column = std::make_shared(); + ColumnPtr default_database_column = std::make_shared(); auto updateColumns = [&](const std::string & cluster_name, const Cluster::ShardInfo & shard_info, const Cluster::Address & address) @@ -68,6 +70,7 @@ BlockInputStreams StorageSystemClusters::read( port_column->insert(static_cast(address.port)); is_local_column->insert(static_cast(shard_info.isLocal())); user_column->insert(address.user); + default_database_column->insert(address.default_database); }; const auto & clusters = context.getClusters(); @@ -114,17 +117,19 @@ BlockInputStreams StorageSystemClusters::read( } } - Block block; - - block.insert(ColumnWithTypeAndName(cluster_column, std::make_shared(), "cluster")); - block.insert(ColumnWithTypeAndName(shard_num_column, std::make_shared(), "shard_num")); - block.insert(ColumnWithTypeAndName(shard_weight_column, std::make_shared(), "shard_weight")); - block.insert(ColumnWithTypeAndName(replica_num_column, std::make_shared(), "replica_num")); - block.insert(ColumnWithTypeAndName(host_name_column, std::make_shared(), "host_name")); - block.insert(ColumnWithTypeAndName(host_address_column, std::make_shared(), "host_address")); - block.insert(ColumnWithTypeAndName(port_column, std::make_shared(), "port")); - block.insert(ColumnWithTypeAndName(is_local_column, std::make_shared(), "is_local")); - block.insert(ColumnWithTypeAndName(user_column, std::make_shared(), "user")); + Block block + { + {cluster_column, std::make_shared(), "cluster"}, + {shard_num_column, std::make_shared(), "shard_num"}, + {shard_weight_column, std::make_shared(), "shard_weight"}, + {replica_num_column, std::make_shared(), "replica_num"}, + {host_name_column, std::make_shared(), "host_name"}, + {host_address_column, std::make_shared(), "host_address"}, + {port_column, std::make_shared(), "port"}, + {is_local_column, std::make_shared(), "is_local"}, + {user_column, std::make_shared(), "user"}, + {default_database_column, std::make_shared(), "default_database"} + }; return BlockInputStreams{ 1, std::make_shared(block) }; }