Added optional 'default_database' parameter for Cluster (allow Distributed tables to connect to different databases on different shards/replicas) [#METR-22462].

This commit is contained in:
Alexey Milovidov 2016-08-22 23:34:21 +03:00
parent 22b57b060c
commit e0f3cbb432
5 changed files with 59 additions and 36 deletions

View File

@ -19,7 +19,7 @@ public:
Cluster(const Settings & settings, const String & cluster_name);
/// Построить кластер по именам шардов и реплик. Локальные обрабатываются так же как удаленные.
Cluster(const Settings & settings, std::vector<std::vector<String>> names,
Cluster(const Settings & settings, const std::vector<std::vector<String>> & names,
const String & username, const String & password);
Cluster(const Cluster &) = delete;
@ -31,19 +31,20 @@ public:
public:
struct Address
{
/** В конфиге адреса либо находятся в узлах <node>:
/** In configuration file,
* addresses are located either in <node> elements:
* <node>
* <host>example01-01-1</host>
* <port>9000</port>
* <!-- <user>, <password>, если нужны -->
* <!-- <user>, <password>, <default_database> if needed -->
* </node>
* ...
* либо в узлах <shard>, и внутри - <replica>
* or in <shard> and inside in <replica> elements:
* <shard>
* <replica>
* <host>example01-01-1</host>
* <port>9000</port>
* <!-- <user>, <password>, если нужны -->
* <!-- <user>, <password>, <default_database> if needed -->
* </replica>
* </shard>
*/
@ -52,6 +53,7 @@ public:
UInt16 port;
String user;
String password;
String default_database; /// this database is selected when no database is specified for Distributed table
UInt32 replica_num;
Address(const String & config_prefix);

View File

@ -7,8 +7,9 @@ namespace DB
class Context;
/** Реализует системную таблицу columns, которая позволяет получить информацию
* о столбцах каждой таблицы для всех баз данных.
/** Implements system table 'clusters'
* that allows to obtain information about available clusters
* (which may be specified in Distributed tables).
*/
class StorageSystemClusters : public IStorage
{

View File

@ -22,16 +22,22 @@ namespace ErrorCodes
namespace
{
/// Вес шарда по-умолчанию.
/// Default shard weight.
static constexpr int default_weight = 1;
inline bool isLocal(const Cluster::Address & address)
{
/// Если среди реплик существует такая, что:
/// - её порт совпадает с портом, который слушает сервер;
/// - её хост резолвится в набор адресов, один из которых совпадает с одним из адресов сетевых интерфейсов сервера
/// то нужно всегда ходить на этот шард без межпроцессного взаимодействия
return isLocalAddress(address.resolved_address);
/// If there is replica, for which:
/// - its port is the same that the server is listening;
/// - its host is resolved to set of addresses, one of which is the same as one of addresses of network interfaces of the server machine*;
/// then we must go to this shard without any inter-process communication.
///
/// * - this criteria is somewhat approximate.
///
/// Also, replica is considered non-local, if it has default database set
/// (only reason is to avoid query rewrite).
return address.default_database.empty() && isLocalAddress(address.resolved_address);
}
inline std::string addressToDirName(const Cluster::Address & address)
@ -43,7 +49,7 @@ inline std::string addressToDirName(const Cluster::Address & address)
std::to_string(address.resolved_address.port());
}
/// Для кэширования DNS запросов.
/// To cache DNS requests.
Poco::Net::SocketAddress resolveSocketAddressImpl1(const String & host, UInt16 port)
{
return Poco::Net::SocketAddress(host, port);
@ -68,7 +74,7 @@ Poco::Net::SocketAddress resolveSocketAddress(const String & host_and_port)
}
/// Реализация класса Cluster::Address
/// Implementation of Cluster::Address class
Cluster::Address::Address(const String & config_prefix)
{
@ -79,6 +85,7 @@ Cluster::Address::Address(const String & config_prefix)
resolved_address = resolveSocketAddress(host_name, port);
user = config.getString(config_prefix + ".user", "default");
password = config.getString(config_prefix + ".password", "");
default_database = config.getString(config_prefix + ".default_database", "");
}
@ -87,7 +94,7 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const
{
UInt16 default_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0);
/// Похоже на то, что строка host_port_ содержит порт. Если условие срабатывает - не обязательно значит, что порт есть (пример: [::]).
/// It's like that 'host_port_' string contains port. If condition is met, it doesn't necessarily mean that port exists (example: [::]).
if ((nullptr != strchr(host_port_.c_str(), ':')) || !default_port)
{
resolved_address = resolveSocketAddress(host_port_);
@ -102,7 +109,7 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const
}
}
/// Реализация класса Clusters
/// Implementation of Clusters class
Clusters::Clusters(const Settings & settings, const String & config_name)
{
@ -158,7 +165,7 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
info.pool = std::make_shared<ConnectionPool>(
settings.distributed_connections_pool_size,
address.host_name, address.port, address.resolved_address,
"", address.user, address.password,
address.default_database, address.user, address.password,
"server", Protocol::Compression::Enable,
saturate(settings.connect_timeout, settings.limits.max_execution_time),
saturate(settings.receive_timeout, settings.limits.max_execution_time),
@ -237,7 +244,7 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
replicas.emplace_back(std::make_shared<ConnectionPool>(
settings.distributed_connections_pool_size,
replica.host_name, replica.port, replica.resolved_address,
"", replica.user, replica.password,
replica.default_database, replica.user, replica.password,
"server", Protocol::Compression::Enable,
saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time),
saturate(settings.receive_timeout, settings.limits.max_execution_time),
@ -265,7 +272,7 @@ Cluster::Cluster(const Settings & settings, const String & cluster_name)
}
Cluster::Cluster(const Settings & settings, std::vector<std::vector<String>> names,
Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String>> & names,
const String & username, const String & password)
{
UInt32 current_shard_num = 1;
@ -286,7 +293,7 @@ Cluster::Cluster(const Settings & settings, std::vector<std::vector<String>> nam
replicas.emplace_back(std::make_shared<ConnectionPool>(
settings.distributed_connections_pool_size,
replica.host_name, replica.port, replica.resolved_address,
"", replica.user, replica.password,
replica.default_database, replica.user, replica.password,
"server", Protocol::Compression::Enable,
saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time),
saturate(settings.receive_timeout, settings.limits.max_execution_time),

View File

@ -360,8 +360,16 @@ StoragePtr StorageFactory::get(
}
else if (name == "Distributed")
{
/** В запросе в качестве аргумента для движка указано имя конфигурационной секции,
* в которой задан список удалённых серверов, а также имя удалённой БД и имя удалённой таблицы.
/** Arguments of engine is following:
* - name of cluster in configuration;
* - name of remote database;
* - name of remote table;
*
* Remote database may be specified in following form:
* - identifier;
* - constant expression with string result, like currentDatabase();
* -- string literal as specific case;
* - empty string means 'use default database from cluster'.
*/
ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children;
@ -380,7 +388,7 @@ StoragePtr StorageFactory::get(
String cluster_name = getClusterName(*args[0]);
String remote_database = reinterpretAsIdentifier(args[1], local_context).name;
String remote_table = typeid_cast<ASTIdentifier &>(*args[2]).name;
String remote_table = reinterpretAsIdentifier(args[2], local_context).name;
const auto & sharding_key = args.size() == 4 ? args[3] : nullptr;

View File

@ -22,7 +22,8 @@ StorageSystemClusters::StorageSystemClusters(const std::string & name_, Context
{ "host_address", std::make_shared<DataTypeString>() },
{ "port", std::make_shared<DataTypeUInt16>() },
{ "is_local", std::make_shared<DataTypeUInt8>() },
{ "user", std::make_shared<DataTypeString>() }
{ "user", std::make_shared<DataTypeString>() },
{ "default_database", std::make_shared<DataTypeString>() }
}
, context(context_)
{
@ -54,6 +55,7 @@ BlockInputStreams StorageSystemClusters::read(
ColumnPtr port_column = std::make_shared<ColumnUInt16>();
ColumnPtr is_local_column = std::make_shared<ColumnUInt8>();
ColumnPtr user_column = std::make_shared<ColumnString>();
ColumnPtr default_database_column = std::make_shared<ColumnString>();
auto updateColumns = [&](const std::string & cluster_name, const Cluster::ShardInfo & shard_info,
const Cluster::Address & address)
@ -68,6 +70,7 @@ BlockInputStreams StorageSystemClusters::read(
port_column->insert(static_cast<UInt64>(address.port));
is_local_column->insert(static_cast<UInt64>(shard_info.isLocal()));
user_column->insert(address.user);
default_database_column->insert(address.default_database);
};
const auto & clusters = context.getClusters();
@ -114,17 +117,19 @@ BlockInputStreams StorageSystemClusters::read(
}
}
Block block;
block.insert(ColumnWithTypeAndName(cluster_column, std::make_shared<DataTypeString>(), "cluster"));
block.insert(ColumnWithTypeAndName(shard_num_column, std::make_shared<DataTypeUInt32>(), "shard_num"));
block.insert(ColumnWithTypeAndName(shard_weight_column, std::make_shared<DataTypeUInt32>(), "shard_weight"));
block.insert(ColumnWithTypeAndName(replica_num_column, std::make_shared<DataTypeUInt32>(), "replica_num"));
block.insert(ColumnWithTypeAndName(host_name_column, std::make_shared<DataTypeString>(), "host_name"));
block.insert(ColumnWithTypeAndName(host_address_column, std::make_shared<DataTypeString>(), "host_address"));
block.insert(ColumnWithTypeAndName(port_column, std::make_shared<DataTypeUInt16>(), "port"));
block.insert(ColumnWithTypeAndName(is_local_column, std::make_shared<DataTypeUInt8>(), "is_local"));
block.insert(ColumnWithTypeAndName(user_column, std::make_shared<DataTypeString>(), "user"));
Block block
{
{cluster_column, std::make_shared<DataTypeString>(), "cluster"},
{shard_num_column, std::make_shared<DataTypeUInt32>(), "shard_num"},
{shard_weight_column, std::make_shared<DataTypeUInt32>(), "shard_weight"},
{replica_num_column, std::make_shared<DataTypeUInt32>(), "replica_num"},
{host_name_column, std::make_shared<DataTypeString>(), "host_name"},
{host_address_column, std::make_shared<DataTypeString>(), "host_address"},
{port_column, std::make_shared<DataTypeUInt16>(), "port"},
{is_local_column, std::make_shared<DataTypeUInt8>(), "is_local"},
{user_column, std::make_shared<DataTypeString>(), "user"},
{default_database_column, std::make_shared<DataTypeString>(), "default_database"}
};
return BlockInputStreams{ 1, std::make_shared<OneBlockInputStream>(block) };
}