dbms: Cluster: improvement; TableFunctionRemote: allowed to omit port number [#METR-9750].

This commit is contained in:
Alexey Milovidov 2014-02-23 01:50:27 +04:00
parent 27928ba4ad
commit f5f95b7cd6
4 changed files with 59 additions and 45 deletions

View File

@ -13,10 +13,10 @@ namespace DB
/// С локальными узлами соединение не устанавливается, а выполяется запрос напрямую.
/// Поэтому храним только количество локальных узлов
/// В конфиге кластер включает в себя узлы <node> или <shard>
class Cluster
class Cluster : private boost::noncopyable
{
public:
Cluster(const Settings & settings, const DataTypeFactory & data_type_factory, const std::string & cluster_name);
Cluster(const Settings & settings, const DataTypeFactory & data_type_factory, const String & cluster_name);
/// Построить кластер по именам шардов и реплик, локальные обрабатываются так же как удаленные
Cluster(const Settings & settings, const DataTypeFactory & data_type_factory, std::vector< std::vector<String> > names,
@ -30,7 +30,7 @@ public:
ConnectionPools pools;
/// используеться для выставления ограничения на размер таймаута
static Poco::Timespan saturation(const Poco::Timespan & v, const Poco::Timespan & limit);
static Poco::Timespan saturate(const Poco::Timespan & v, const Poco::Timespan & limit);
private:
struct Address
@ -55,11 +55,11 @@ private:
String user;
String password;
Address(const std::string & config_prefix);
Address(const Poco::Net::SocketAddress & host_port_, const String & user_, const String & password_);
Address(const String & config_prefix);
Address(const String & host_port_, const String & user_, const String & password_);
};
bool isLocal(const Address & address);
static bool isLocal(const Address & address);
/// Массив шардов. Каждый шард - адреса одного сервера.
typedef std::vector<Address> Addresses;
@ -73,9 +73,13 @@ private:
size_t local_nodes_num;
};
struct Clusters : public std::map<std::string, Cluster>
struct Clusters
{
typedef std::map<String, Cluster> Impl;
Impl impl;
Clusters(const Settings & settings, const DataTypeFactory & data_type_factory,
const std::string & config_name = "remote_servers");
const String & config_name = "remote_servers");
};
}

View File

@ -7,32 +7,44 @@
namespace DB
{
Cluster::Address::Address(const std::string & config_prefix)
Cluster::Address::Address(const String & config_prefix)
{
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
host_port = Poco::Net::SocketAddress(config.getString(config_prefix + ".host"),
config.getInt(config_prefix + ".port"));
config.getInt(config_prefix + ".port"));
user = config.getString(config_prefix + ".user", "default");
password = config.getString(config_prefix + ".password", "");
}
Clusters::Clusters(const Settings & settings, const DataTypeFactory & data_type_factory, const std::string & config_name)
Cluster::Address::Address(const String & host_port_, const String & user_, const String & password_)
: user(user_), password(password_)
{
UInt16 default_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0);
/// Похоже на то, что строка host_port_ содержит порт. Если условие срабатывает - не обязательно значит, что порт есть (пример: [::]).
if (nullptr != strchr(host_port_.c_str(), ':') || !default_port)
host_port = Poco::Net::SocketAddress(host_port_);
else
host_port = Poco::Net::SocketAddress(host_port_, default_port);
}
Clusters::Clusters(const Settings & settings, const DataTypeFactory & data_type_factory, const String & config_name)
{
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_name, config_keys);
for (Poco::Util::AbstractConfiguration::Keys::const_iterator it = config_keys.begin(); it != config_keys.end(); ++it)
{
insert(value_type(*it, Cluster(settings, data_type_factory, config_name + "." + *it)));
}
impl.emplace(std::piecewise_construct,
std::forward_as_tuple(*it),
std::forward_as_tuple(settings, data_type_factory, config_name + "." + *it));
}
Cluster::Address::Address(const Poco::Net::SocketAddress & host_port_, const String & user_, const String & password_)
: host_port(host_port_), user(user_), password(password_) {}
Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_factory, const std::string & cluster_name):
Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_factory, const String & cluster_name):
local_nodes_num(0)
{
Poco::Util::AbstractConfiguration & config = Poco::Util::Application::instance().config();
@ -90,9 +102,9 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
replicas.push_back(new ConnectionPool(
settings.distributed_connections_pool_size,
jt->host_port.host().toString(), jt->host_port.port(), "", jt->user, jt->password, data_type_factory, "server", Protocol::Compression::Enable,
saturation(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time),
saturation(settings.receive_timeout, settings.limits.max_execution_time),
saturation(settings.send_timeout, settings.limits.max_execution_time)));
saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time),
saturate(settings.receive_timeout, settings.limits.max_execution_time),
saturate(settings.send_timeout, settings.limits.max_execution_time)));
}
}
@ -115,9 +127,9 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
pools.push_back(new ConnectionPool(
settings.distributed_connections_pool_size,
it->host_port.host().toString(), it->host_port.port(), "", it->user, it->password, data_type_factory, "server", Protocol::Compression::Enable,
saturation(settings.connect_timeout, settings.limits.max_execution_time),
saturation(settings.receive_timeout, settings.limits.max_execution_time),
saturation(settings.send_timeout, settings.limits.max_execution_time)));
saturate(settings.connect_timeout, settings.limits.max_execution_time),
saturate(settings.receive_timeout, settings.limits.max_execution_time),
saturate(settings.send_timeout, settings.limits.max_execution_time)));
}
}
}
@ -125,6 +137,7 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
throw Exception("No addresses listed in config", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
}
Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_factory, std::vector< std::vector<String> > names,
const String & username, const String & password): local_nodes_num(0)
{
@ -132,9 +145,10 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
{
Addresses current;
for (size_t j = 0; j < names[i].size(); ++j)
current.push_back(Address(Poco::Net::SocketAddress(names[i][j]), username, password));
current.push_back(Address(names[i][j], username, password));
addresses_with_failover.push_back(current);
}
for (AddressesWithFailover::const_iterator it = addresses_with_failover.begin(); it != addresses_with_failover.end(); ++it)
{
ConnectionPools replicas;
@ -145,15 +159,16 @@ Cluster::Cluster(const Settings & settings, const DataTypeFactory & data_type_fa
replicas.push_back(new ConnectionPool(
settings.distributed_connections_pool_size,
jt->host_port.host().toString(), jt->host_port.port(), "", jt->user, jt->password, data_type_factory, "server", Protocol::Compression::Enable,
saturation(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time),
saturation(settings.receive_timeout, settings.limits.max_execution_time),
saturation(settings.send_timeout, settings.limits.max_execution_time)));
saturate(settings.connect_timeout_with_failover_ms, settings.limits.max_execution_time),
saturate(settings.receive_timeout, settings.limits.max_execution_time),
saturate(settings.send_timeout, settings.limits.max_execution_time)));
}
pools.push_back(new ConnectionPoolWithFailover(replicas, settings.load_balancing, settings.connections_with_failover_max_tries));
}
}
Poco::Timespan Cluster::saturation(const Poco::Timespan & v, const Poco::Timespan & limit)
Poco::Timespan Cluster::saturate(const Poco::Timespan & v, const Poco::Timespan & limit)
{
if (limit.totalMicroseconds() == 0)
return v;
@ -161,10 +176,6 @@ Poco::Timespan Cluster::saturation(const Poco::Timespan & v, const Poco::Timespa
return v > limit ? limit : v;
}
static bool interfaceEqual(const Poco::Net::NetworkInterface & interface, Poco::Net::IPAddress & address)
{
return interface.address() == address;
}
bool Cluster::isLocal(const Address & address)
{
@ -173,11 +184,11 @@ bool Cluster::isLocal(const Address & address)
/// - её хост резолвится в набор адресов, один из которых совпадает с одним из адресов сетевых интерфейсов сервера
/// то нужно всегда ходить на этот шард без межпроцессного взаимодействия
UInt16 clickhouse_port = Poco::Util::Application::instance().config().getInt("tcp_port", 0);
Poco::Net::NetworkInterface::NetworkInterfaceList interfaces = Poco::Net::NetworkInterface::list();
static Poco::Net::NetworkInterface::NetworkInterfaceList interfaces = Poco::Net::NetworkInterface::list();
if (clickhouse_port == address.host_port.port() &&
interfaces.end() != std::find_if(interfaces.begin(), interfaces.end(),
boost::bind(interfaceEqual, _1, address.host_port.host())))
[&](const Poco::Net::NetworkInterface & interface) { return interface.address() == address.host_port.host(); }))
{
LOG_INFO(&Poco::Util::Application::instance().logger(), "Replica with address " << address.host_port.toString() << " will be processed as local.");
return true;

View File

@ -491,9 +491,7 @@ void Context::initClusters()
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);
if (!shared->clusters)
{
shared->clusters = new Clusters(settings, shared->data_type_factory);
}
}
Cluster & Context::getCluster(const std::string & cluster_name)
@ -501,8 +499,8 @@ Cluster & Context::getCluster(const std::string & cluster_name)
if (!shared->clusters)
throw Poco::Exception("Clusters have not been initialized yet.");
Clusters::iterator it = shared->clusters->find(cluster_name);
if (it != shared->clusters->end())
Clusters::Impl::iterator it = shared->clusters->impl.find(cluster_name);
if (it != shared->clusters->impl.end())
return it->second;
else
throw Poco::Exception("Failed to find cluster with name = " + cluster_name);

View File

@ -131,7 +131,7 @@ BlockInputStreams StorageDistributed::read(
/// Установим sign_rewrite = 0, чтобы второй раз не переписывать запрос
Settings new_settings = settings;
new_settings.sign_rewrite = false;
new_settings.queue_max_wait_ms = Cluster::saturation(new_settings.queue_max_wait_ms, settings.limits.max_execution_time);
new_settings.queue_max_wait_ms = Cluster::saturate(new_settings.queue_max_wait_ms, settings.limits.max_execution_time);
/** Запрошены ли виртуальные столбцы?
* Если да - будем добавлять их в виде констант в запрос, предназначенный для выполнения на удалённом сервере,
@ -147,7 +147,7 @@ BlockInputStreams StorageDistributed::read(
need_port_column = true;
}
/** Есть ли виртуальные столбцы в секции селект?
/** Есть ли виртуальные столбцы в секции SELECT?
* Если нет - в случае вычисления запроса до стадии Complete, необходимо удалить их из блока.
*/
bool select_host_column = false;
@ -175,7 +175,7 @@ BlockInputStreams StorageDistributed::read(
VirtualColumnUtils::getVirtualColumnsBlocks(query->clone(), virtual_columns_block, context);
std::set< std::pair<String, UInt16> > values =
VirtualColumnUtils::extractTwoValuesFromBlocks<String, UInt16>(virtual_columns, _host_column_name, _port_column_name);
bool all_inclusive = (values.size() == virtual_columns_block.rows());
bool all_inclusive = values.size() == virtual_columns_block.rows();
size_t result_size = values.size();
if (values.find(std::make_pair("localhost", clickhouse_port)) != values.end())
@ -208,7 +208,7 @@ BlockInputStreams StorageDistributed::read(
need_port_column ? _port_column_name : "",
processed_stage);
if (processed_stage == QueryProcessingStage::WithMergeableState)
if (processed_stage == QueryProcessingStage::WithMergeableState || columns_to_remove.empty())
res.push_back(temp);
else
res.push_back(new RemoveColumnsBlockInputStream(temp, columns_to_remove));
@ -221,14 +221,14 @@ BlockInputStreams StorageDistributed::read(
need_host_column ? "localhost" : "",
need_port_column ? clickhouse_port : 0);
/// добавляем запросы к локальному ClickHouse
/// Добавляем запросы к локальному ClickHouse
DB::Context new_context = context;
new_context.setSettings(new_settings);
for(size_t i = 0; i < cluster.getLocalNodesNum(); ++i)
{
InterpreterSelectQuery interpreter(modified_query_ast, new_context, processed_stage);
if (processed_stage == QueryProcessingStage::WithMergeableState)
if (processed_stage == QueryProcessingStage::WithMergeableState || columns_to_remove.empty())
res.push_back(interpreter.execute());
else
res.push_back(new RemoveColumnsBlockInputStream(interpreter.execute(), columns_to_remove));
@ -248,7 +248,7 @@ Block StorageDistributed::getBlockWithVirtualColumns()
for (ConnectionPools::iterator it = cluster.pools.begin(); it != cluster.pools.end(); ++it)
{
_host.column->insert((*it)->get()->getHost());
_port.column->insert(static_cast<uint64>((*it)->get()->getPort()));
_port.column->insert(static_cast<UInt64>((*it)->get()->getPort()));
}
if (cluster.getLocalNodesNum() > 0)
@ -259,6 +259,7 @@ Block StorageDistributed::getBlockWithVirtualColumns()
_host.column->insert(clockhouse_host);
_port.column->insert(clickhouse_port);
}
res.insert(_host);
res.insert(_port);