Allowed unresolvable addresses in cluster configuration #5714

This commit is contained in:
Alexey Milovidov 2019-07-08 04:43:41 +03:00
parent d362ffb3ee
commit 4752dae9bb
7 changed files with 53 additions and 37 deletions

View File

@ -73,7 +73,7 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
current_resolved_address = DNSResolver::instance().resolveAddress(host, port);
socket->connect(current_resolved_address, timeouts.connection_timeout);
socket->connect(*current_resolved_address, timeouts.connection_timeout);
socket->setReceiveTimeout(timeouts.receive_timeout);
socket->setSendTimeout(timeouts.send_timeout);
socket->setNoDelay(true);
@ -533,12 +533,9 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
LOG_DEBUG(log_wrapper.get(), msg.rdbuf());
}
Poco::Net::SocketAddress Connection::getResolvedAddress() const
std::optional<Poco::Net::SocketAddress> Connection::getResolvedAddress() const
{
if (connected)
return current_resolved_address;
return DNSResolver::instance().resolveAddress(host, port);
return current_resolved_address;
}
@ -720,11 +717,14 @@ void Connection::initBlockLogsInput()
void Connection::setDescription()
{
auto resolved_address = getResolvedAddress();
description = host + ":" + toString(resolved_address.port());
auto ip_address = resolved_address.host().toString();
description = host + ":" + toString(port);
if (host != ip_address)
description += ", " + ip_address;
if (resolved_address)
{
auto ip_address = resolved_address->host().toString();
if (host != ip_address)
description += ", " + ip_address;
}
}

View File

@ -63,7 +63,7 @@ public:
Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0))
:
host(host_), port(port_), default_database(default_database_),
user(user_), password(password_), current_resolved_address(host, port),
user(user_), password(password_),
client_name(client_name_),
compression(compression_),
secure(secure_),
@ -168,9 +168,6 @@ public:
size_t outBytesCount() const { return out ? out->count() : 0; }
size_t inBytesCount() const { return in ? in->count() : 0; }
/// Returns initially resolved address
Poco::Net::SocketAddress getResolvedAddress() const;
private:
String host;
UInt16 port;
@ -180,12 +177,15 @@ private:
/// Address is resolved during the first connection (or the following reconnects)
/// Use it only for logging purposes
Poco::Net::SocketAddress current_resolved_address;
std::optional<Poco::Net::SocketAddress> current_resolved_address;
/// For messages in log and in exceptions.
String description;
void setDescription();
/// Returns resolved address if it was resolved.
std::optional<Poco::Net::SocketAddress> getResolvedAddress() const;
String client_name;
bool connected = false;

View File

@ -23,9 +23,7 @@ namespace DB
* - the routing rules that affect which network interface we go to the specified address are not checked.
*/
bool isLocalAddress(const Poco::Net::SocketAddress & address, UInt16 clickhouse_port);
bool isLocalAddress(const Poco::Net::SocketAddress & address);
bool isLocalAddress(const Poco::Net::IPAddress & address);
/// Returns number of different bytes in hostnames, used for load balancing

View File

@ -29,9 +29,9 @@ namespace
/// Default shard weight.
static constexpr UInt32 default_weight = 1;
inline bool isLocal(const Cluster::Address & address, const Poco::Net::SocketAddress & resolved_address, UInt16 clickhouse_port)
inline bool isLocalImpl(const Cluster::Address & address, const Poco::Net::SocketAddress & resolved_address, UInt16 clickhouse_port)
{
/// If there is replica, for which:
/// If there is replica, for which:
/// - its port is the same that the server is listening;
/// - its host is resolved to set of addresses, one of which is the same as one of addresses of network interfaces of the server machine*;
/// then we must go to this shard without any inter-process communication.
@ -48,10 +48,31 @@ inline bool isLocal(const Cluster::Address & address, const Poco::Net::SocketAdd
/// Implementation of Cluster::Address class
std::optional<Poco::Net::SocketAddress> Cluster::Address::getResolvedAddress() const
{
try
{
return DNSResolver::instance().resolveAddress(host_name, port);
}
catch (...)
{
/// Failure in DNS resolution in cluster initialization is Ok.
tryLogCurrentException("Cluster");
return {};
}
}
bool Cluster::Address::isLocal(UInt16 clickhouse_port) const
{
if (auto resolved = getResolvedAddress())
return isLocalImpl(*this, *resolved, clickhouse_port);
return false;
}
Cluster::Address::Address(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
{
UInt16 clickhouse_port = static_cast<UInt16>(config.getInt("tcp_port", 0));
host_name = config.getString(config_prefix + ".host");
port = static_cast<UInt16>(config.getInt(config_prefix + ".port"));
if (config.has(config_prefix + ".user"))
@ -60,10 +81,9 @@ Cluster::Address::Address(const Poco::Util::AbstractConfiguration & config, cons
user = config.getString(config_prefix + ".user", "default");
password = config.getString(config_prefix + ".password", "");
default_database = config.getString(config_prefix + ".default_database", "");
initially_resolved_address = DNSResolver::instance().resolveAddress(host_name, port);
is_local = isLocal(*this, initially_resolved_address, clickhouse_port);
secure = config.getBool(config_prefix + ".secure", false) ? Protocol::Secure::Enable : Protocol::Secure::Disable;
compression = config.getBool(config_prefix + ".compression", true) ? Protocol::Compression::Enable : Protocol::Compression::Disable;
is_local = isLocal(config.getInt("tcp_port", 0));
}
@ -74,9 +94,7 @@ Cluster::Address::Address(const String & host_port_, const String & user_, const
host_name = parsed_host_port.first;
port = parsed_host_port.second;
secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable;
initially_resolved_address = DNSResolver::instance().resolveAddress(parsed_host_port.first, parsed_host_port.second);
is_local = isLocal(*this, initially_resolved_address, clickhouse_port);
is_local = isLocal(clickhouse_port);
}

View File

@ -60,7 +60,7 @@ public:
/// This database is selected when no database is specified for Distributed table
String default_database;
/// The locality is determined at the initialization, and is not changed even if DNS is changed
bool is_local;
bool is_local = false;
bool user_specified = false;
Protocol::Compression compression = Protocol::Compression::Enable;
@ -84,17 +84,14 @@ public:
String toFullString() const;
static Address fromFullString(const String & address_full_string);
/// Returns initially resolved address
Poco::Net::SocketAddress getResolvedAddress() const
{
return initially_resolved_address;
}
/// Returns resolved address if it does resolve.
std::optional<Poco::Net::SocketAddress> getResolvedAddress() const;
auto tuple() const { return std::tie(host_name, port, secure, user, password, default_database); }
bool operator==(const Address & other) const { return tuple() == other.tuple(); }
private:
Poco::Net::SocketAddress initially_resolved_address;
bool isLocal(UInt16 clickhouse_port) const;
};
using Addresses = std::vector<Address>;

View File

@ -506,8 +506,9 @@ void DDLWorker::parseQueryAndResolveHost(DDLTask & task)
{
const Cluster::Address & address = shards[shard_num][replica_num];
if (isLocalAddress(address.getResolvedAddress(), context.getTCPPort())
|| (context.getTCPPortSecure() && isLocalAddress(address.getResolvedAddress(), *context.getTCPPortSecure())))
if (auto resolved = address.getResolvedAddress();
resolved && (isLocalAddress(*resolved, context.getTCPPort())
|| (context.getTCPPortSecure() && isLocalAddress(*resolved, *context.getTCPPortSecure()))))
{
if (found_via_resolving)
{

View File

@ -10,7 +10,8 @@ namespace DB
NamesAndTypesList StorageSystemClusters::getNamesAndTypes()
{
return {
return
{
{"cluster", std::make_shared<DataTypeString>()},
{"shard_num", std::make_shared<DataTypeUInt32>()},
{"shard_weight", std::make_shared<DataTypeUInt32>()},
@ -48,7 +49,8 @@ void StorageSystemClusters::fillData(MutableColumns & res_columns, const Context
res_columns[i++]->insert(shard_info.weight);
res_columns[i++]->insert(replica_index + 1);
res_columns[i++]->insert(address.host_name);
res_columns[i++]->insert(DNSResolver::instance().resolveHost(address.host_name).toString());
auto resolved = address.getResolvedAddress();
res_columns[i++]->insert(resolved ? resolved->host().toString() : String());
res_columns[i++]->insert(address.port);
res_columns[i++]->insert(shard_info.isLocal());
res_columns[i++]->insert(address.user);