diff --git a/dbms/programs/server/Server.cpp b/dbms/programs/server/Server.cpp index 23b7efa7193..c40fa945cef 100644 --- a/dbms/programs/server/Server.cpp +++ b/dbms/programs/server/Server.cpp @@ -525,8 +525,8 @@ int Server::main(const std::vector & /*args*/) } else { - /// Initialize a watcher updating DNS cache in case of network errors - dns_cache_updater = std::make_unique(*global_context); + /// Initialize a watcher periodically updating DNS cache + dns_cache_updater = std::make_unique(*global_context, config().getInt("dns_cache_update_period", 15)); } #if defined(__linux__) @@ -773,6 +773,8 @@ int Server::main(const std::vector & /*args*/) main_config_reloader->start(); users_config_reloader->start(); + if (dns_cache_updater) + dns_cache_updater->start(); { std::stringstream message; @@ -823,6 +825,7 @@ int Server::main(const std::vector & /*args*/) log, "Closed connections." << (current_connections ? " But " + toString(current_connections) + " remains." " Tip: To increase wait time add to config: 60" : "")); + dns_cache_updater.reset(); main_config_reloader.reset(); users_config_reloader.reset(); }); diff --git a/dbms/src/Common/DNSResolver.cpp b/dbms/src/Common/DNSResolver.cpp index b75498a9e0c..5314068b291 100644 --- a/dbms/src/Common/DNSResolver.cpp +++ b/dbms/src/Common/DNSResolver.cpp @@ -1,14 +1,23 @@ #include "DNSResolver.h" #include #include +#include +#include #include #include #include #include +#include +#include #include #include #include +namespace ProfileEvents +{ + extern Event NetworkErrors; +} + namespace DB { @@ -67,7 +76,6 @@ static void splitHostAndPort(const std::string & host_and_port, std::string & ou } } - static Poco::Net::IPAddress resolveIPAddressImpl(const std::string & host) { /// NOTE: Poco::Net::DNS::resolveOne(host) doesn't work for IP addresses like 127.0.0.2 @@ -75,15 +83,22 @@ static Poco::Net::IPAddress resolveIPAddressImpl(const std::string & host) return Poco::Net::SocketAddress(host, 0U).host(); } - struct DNSResolver::Impl { SimpleCache cache_host; + std::mutex drop_mutex; + std::mutex update_mutex; + /// Cached server host name - std::mutex mutex; std::optional host_name; + /// Store hosts, which was asked to resolve from last update of DNS cache. + NameSet new_hosts; + + /// Store all hosts, which was whenever asked to resolve + NameSet known_hosts; + /// If disabled, will not make cache lookups, will resolve addresses manually on each call std::atomic disable_cache{false}; }; @@ -93,28 +108,43 @@ DNSResolver::DNSResolver() : impl(std::make_unique()) {} Poco::Net::IPAddress DNSResolver::resolveHost(const std::string & host) { - return !impl->disable_cache ? impl->cache_host(host) : resolveIPAddressImpl(host); + if (impl->disable_cache) + return resolveIPAddressImpl(host); + + addToNewHosts(host); + return impl->cache_host(host); } Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host_and_port) { + if (impl->disable_cache) + return Poco::Net::SocketAddress(host_and_port); + String host; UInt16 port; splitHostAndPort(host_and_port, host, port); - return !impl->disable_cache ? Poco::Net::SocketAddress(impl->cache_host(host), port) : Poco::Net::SocketAddress(host_and_port); + addToNewHosts(host); + return Poco::Net::SocketAddress(impl->cache_host(host), port); } Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host, UInt16 port) { - return !impl->disable_cache ? Poco::Net::SocketAddress(impl->cache_host(host), port) : Poco::Net::SocketAddress(host, port); + if (impl->disable_cache) + return Poco::Net::SocketAddress(host, port); + + addToNewHosts(host); + return Poco::Net::SocketAddress(impl->cache_host(host), port); } void DNSResolver::dropCache() { impl->cache_host.drop(); - std::unique_lock lock(impl->mutex); + std::scoped_lock lock(impl->update_mutex, impl->drop_mutex); + + impl->known_hosts.clear(); + impl->new_hosts.clear(); impl->host_name.reset(); } @@ -128,7 +158,7 @@ String DNSResolver::getHostName() if (impl->disable_cache) return Poco::Net::DNS::hostName(); - std::unique_lock lock(impl->mutex); + std::lock_guard lock(impl->drop_mutex); if (!impl->host_name.has_value()) impl->host_name.emplace(Poco::Net::DNS::hostName()); @@ -136,6 +166,61 @@ String DNSResolver::getHostName() return *impl->host_name; } +bool DNSResolver::updateCache() +{ + { + std::lock_guard lock(impl->drop_mutex); + for (auto & host : impl->new_hosts) + impl->known_hosts.insert(std::move(host)); + impl->new_hosts.clear(); + + impl->host_name.emplace(Poco::Net::DNS::hostName()); + } + + std::lock_guard lock(impl->update_mutex); + + bool updated = false; + String lost_hosts; + for (const auto & host : impl->known_hosts) + { + try + { + updated |= updateHost(host); + } + catch (const Poco::Net::NetException &) + { + ProfileEvents::increment(ProfileEvents::NetworkErrors); + + if (!lost_hosts.empty()) + lost_hosts += ", "; + lost_hosts += host; + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + + if (!lost_hosts.empty()) + LOG_INFO(&Logger::get("DNSResolver"), "Cached hosts not found: " << lost_hosts); + + return updated; +} + +bool DNSResolver::updateHost(const String & host) +{ + /// Usage of updateHost implies that host is already in cache and there is no extra computations + auto old_value = impl->cache_host(host); + impl->cache_host.update(host); + return old_value != impl->cache_host(host); +} + +void DNSResolver::addToNewHosts(const String & host) +{ + std::lock_guard lock(impl->drop_mutex); + impl->new_hosts.insert(host); +} + DNSResolver::~DNSResolver() = default; diff --git a/dbms/src/Common/DNSResolver.h b/dbms/src/Common/DNSResolver.h index 097e646fa65..df646be9981 100644 --- a/dbms/src/Common/DNSResolver.h +++ b/dbms/src/Common/DNSResolver.h @@ -4,13 +4,15 @@ #include #include #include +#include namespace DB { -/// A singleton implementing DNS names resolving with optional permanent DNS cache -/// The cache could be updated only manually via drop() method +/// A singleton implementing DNS names resolving with optional DNS cache +/// The cache is being updated asynchronous in separate thread (see DNSCacheUpdater) +/// or it could be updated manually via drop() method. class DNSResolver : public ext::singleton { public: @@ -34,9 +36,13 @@ public: /// Drops all caches void dropCache(); + /// Updates all known hosts in cache. + /// Returns true if IP of any host has been changed. + bool updateCache(); + ~DNSResolver(); -protected: +private: DNSResolver(); @@ -44,6 +50,11 @@ protected: struct Impl; std::unique_ptr impl; + + /// Returns true if IP of host has been changed. + bool updateHost(const String & host); + + void addToNewHosts(const String & host); }; } diff --git a/dbms/src/Interpreters/DNSCacheUpdater.cpp b/dbms/src/Interpreters/DNSCacheUpdater.cpp index 80ea1258f48..66686dd9010 100644 --- a/dbms/src/Interpreters/DNSCacheUpdater.cpp +++ b/dbms/src/Interpreters/DNSCacheUpdater.cpp @@ -2,78 +2,31 @@ #include #include #include -#include -#include -#include - - -namespace ProfileEvents -{ - extern Event NetworkErrors; -} - namespace DB { -namespace ErrorCodes -{ - extern const int TIMEOUT_EXCEEDED; - extern const int ALL_CONNECTION_TRIES_FAILED; -} - - -/// Call it inside catch section -/// Returns true if it is a network error -static bool isNetworkError() -{ - try - { - throw; - } - catch (const Exception & e) - { - if (e.code() == ErrorCodes::TIMEOUT_EXCEEDED || e.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED) - return true; - } - catch (Poco::Net::DNSException &) - { - return true; - } - catch (Poco::TimeoutException &) - { - return true; - } - catch (...) - { - /// Do nothing - } - - return false; -} - - -DNSCacheUpdater::DNSCacheUpdater(Context & context_) - : context(context_), pool(context_.getSchedulePool()) +DNSCacheUpdater::DNSCacheUpdater(Context & context_, Int32 update_period_seconds_) + : context(context_), + update_period_seconds(update_period_seconds_), + pool(context_.getSchedulePool()) { task_handle = pool.createTask("DNSCacheUpdater", [this]{ run(); }); } void DNSCacheUpdater::run() { - auto num_current_network_exceptions = ProfileEvents::global_counters[ProfileEvents::NetworkErrors].load(std::memory_order_relaxed); - if (num_current_network_exceptions >= last_num_network_erros + min_errors_to_update_cache) + watch.restart(); + auto & resolver = DNSResolver::instance(); + + /// Reload cluster config if IP of any host has been changed since last update. + if (resolver.updateCache()) { + LOG_INFO(&Poco::Logger::get("DNSCacheUpdater"), + "IPs of some hosts have been changed. Will reload cluster config."); try { - LOG_INFO(&Poco::Logger::get("DNSCacheUpdater"), "Updating DNS cache"); - - DNSResolver::instance().dropCache(); context.reloadClusterConfig(); - - last_num_network_erros = num_current_network_exceptions; - task_handle->scheduleAfter(min_update_period_seconds * 1000); - return; } catch (...) { @@ -81,19 +34,18 @@ void DNSCacheUpdater::run() } } - task_handle->scheduleAfter(10 * 1000); + auto interval_ms = std::max(0, update_period_seconds * 1000 - static_cast(watch.elapsedMilliseconds())); + task_handle->scheduleAfter(interval_ms); } -bool DNSCacheUpdater::incrementNetworkErrorEventsIfNeeded() +void DNSCacheUpdater::start() { - if (isNetworkError()) - { - ProfileEvents::increment(ProfileEvents::NetworkErrors); - return true; - } + task_handle->activateAndSchedule(); +} - return false; +DNSCacheUpdater::~DNSCacheUpdater() +{ + task_handle->deactivate(); } } - diff --git a/dbms/src/Interpreters/DNSCacheUpdater.h b/dbms/src/Interpreters/DNSCacheUpdater.h index 6d34697c401..63f6dba7285 100644 --- a/dbms/src/Interpreters/DNSCacheUpdater.h +++ b/dbms/src/Interpreters/DNSCacheUpdater.h @@ -1,10 +1,6 @@ -#pragma once - -#include -#include -#include #include +#include namespace DB @@ -12,26 +8,23 @@ namespace DB class Context; -/// Add a task to BackgroundProcessingPool that watch for ProfileEvents::NetworkErrors and updates DNS cache if it has increased +/// Add a task to BackgroundProcessingPool that resolves all hosts and updates cache with constant period. class DNSCacheUpdater { public: - explicit DNSCacheUpdater(Context & context); - - /// Checks if it is a network error and increments ProfileEvents::NetworkErrors - static bool incrementNetworkErrorEventsIfNeeded(); + explicit DNSCacheUpdater(Context & context, Int32 update_period_seconds_); + ~DNSCacheUpdater(); + void start(); private: void run(); Context & context; + Int32 update_period_seconds; + BackgroundSchedulePool & pool; BackgroundSchedulePoolTaskHolder task_handle; - - size_t last_num_network_erros = 0; - - static constexpr size_t min_errors_to_update_cache = 3; - static constexpr time_t min_update_period_seconds = 45; + Stopwatch watch; }; diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 1dfb7def86b..4b6a5da2d67 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -431,8 +431,6 @@ static std::tuple executeQueryImpl( if (!internal) onExceptionBeforeStart(query, context, current_time); - DNSCacheUpdater::incrementNetworkErrorEventsIfNeeded(); - throw; } diff --git a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp index b63ffeefd5e..a883946bc78 100644 --- a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp +++ b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp @@ -206,7 +206,6 @@ void BackgroundProcessingPool::threadFunction() catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); - DNSCacheUpdater::incrementNetworkErrorEventsIfNeeded(); } if (shutdown) diff --git a/dbms/tests/integration/test_host_ip_change/configs/dns_update_long.xml b/dbms/tests/integration/test_host_ip_change/configs/dns_update_long.xml new file mode 100644 index 00000000000..133e3cf725a --- /dev/null +++ b/dbms/tests/integration/test_host_ip_change/configs/dns_update_long.xml @@ -0,0 +1,3 @@ + + 200 + diff --git a/dbms/tests/integration/test_host_ip_change/configs/dns_update_short.xml b/dbms/tests/integration/test_host_ip_change/configs/dns_update_short.xml new file mode 100644 index 00000000000..2bfafe2ef21 --- /dev/null +++ b/dbms/tests/integration/test_host_ip_change/configs/dns_update_short.xml @@ -0,0 +1,3 @@ + + 2 + diff --git a/dbms/tests/integration/test_host_ip_change/configs/remote_servers.xml b/dbms/tests/integration/test_host_ip_change/configs/remote_servers.xml index 538aa72d386..1b9356d59bd 100644 --- a/dbms/tests/integration/test_host_ip_change/configs/remote_servers.xml +++ b/dbms/tests/integration/test_host_ip_change/configs/remote_servers.xml @@ -1,14 +1,13 @@ - - - true - - shard_0 - node1 - 9000 - - - + + + false + + lost_host + 9000 + + + diff --git a/dbms/tests/integration/test_host_ip_change/test.py b/dbms/tests/integration/test_host_ip_change/test.py index 19280720488..3db6331f344 100644 --- a/dbms/tests/integration/test_host_ip_change/test.py +++ b/dbms/tests/integration/test_host_ip_change/test.py @@ -4,28 +4,33 @@ import pytest import subprocess from helpers.cluster import ClickHouseCluster from helpers.test_tools import assert_eq_with_retry +from helpers.client import QueryRuntimeException +from helpers.test_tools import TSV cluster = ClickHouseCluster(__file__) -node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml', 'configs/listen_host.xml'], with_zookeeper=True, ipv6_address='2001:3984:3989::1:1111') -node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml', 'configs/listen_host.xml'], with_zookeeper=True, ipv6_address='2001:3984:3989::1:1112') +def _fill_nodes(nodes, table_name): + for node in nodes: + node.query( + ''' + CREATE DATABASE IF NOT EXISTS test; + CREATE TABLE IF NOT EXISTS {0}(date Date, id UInt32) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/{0}', '{1}') + ORDER BY id PARTITION BY toYYYYMM(date); + '''.format(table_name, node.name) + ) +node1 = cluster.add_instance('node1', main_configs=['configs/listen_host.xml'], with_zookeeper=True, ipv6_address='2001:3984:3989::1:1111') +node2 = cluster.add_instance('node2', main_configs=['configs/listen_host.xml', 'configs/dns_update_long.xml'], + with_zookeeper=True, ipv6_address='2001:3984:3989::1:1112') @pytest.fixture(scope="module") -def start_cluster(): +def cluster_without_dns_cache_update(): try: cluster.start() - for node in [node1, node2]: - node.query( - ''' - CREATE DATABASE IF NOT EXISTS test; - CREATE TABLE IF NOT EXISTS test_table(date Date, id UInt32) - ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/replicated', '{}') - ORDER BY id PARTITION BY toYYYYMM(date); - '''.format(node.name) - ) + _fill_nodes([node1, node2], 'test_table_drop') yield cluster @@ -36,31 +41,101 @@ def start_cluster(): cluster.shutdown() pass - -def test_merge_doesnt_work_without_zookeeper(start_cluster): +# node1 is a source, node2 downloads data +# node2 has long dns_cache_update_period, so dns cache update wouldn't work +def test_ip_change_drop_dns_cache(cluster_without_dns_cache_update): # First we check, that normal replication works - node1.query("INSERT INTO test_table VALUES ('2018-10-01', 1), ('2018-10-02', 2), ('2018-10-03', 3)") - assert node1.query("SELECT count(*) from test_table") == "3\n" - assert_eq_with_retry(node2, "SELECT count(*) from test_table", "3") + node1.query("INSERT INTO test_table_drop VALUES ('2018-10-01', 1), ('2018-10-02', 2), ('2018-10-03', 3)") + assert node1.query("SELECT count(*) from test_table_drop") == "3\n" + assert_eq_with_retry(node2, "SELECT count(*) from test_table_drop", "3") # We change source node ip cluster.restart_instance_with_ip_change(node1, "2001:3984:3989::1:7777") # Put some data to source node1 - node1.query("INSERT INTO test_table VALUES ('2018-10-01', 5), ('2018-10-02', 6), ('2018-10-03', 7)") + node1.query("INSERT INTO test_table_drop VALUES ('2018-10-01', 5), ('2018-10-02', 6), ('2018-10-03', 7)") # Check that data is placed on node1 - assert node1.query("SELECT count(*) from test_table") == "6\n" + assert node1.query("SELECT count(*) from test_table_drop") == "6\n" # Because of DNS cache dest node2 cannot download data from node1 with pytest.raises(Exception): - assert_eq_with_retry(node2, "SELECT count(*) from test_table", "6") + assert_eq_with_retry(node2, "SELECT count(*) from test_table_drop", "6") # drop DNS cache node2.query("SYSTEM DROP DNS CACHE") # Data is downloaded - assert_eq_with_retry(node2, "SELECT count(*) from test_table", "6") + assert_eq_with_retry(node2, "SELECT count(*) from test_table_drop", "6") # Just to be sure check one more time - node1.query("INSERT INTO test_table VALUES ('2018-10-01', 8)") - assert node1.query("SELECT count(*) from test_table") == "7\n" - assert_eq_with_retry(node2, "SELECT count(*) from test_table", "7") + node1.query("INSERT INTO test_table_drop VALUES ('2018-10-01', 8)") + assert node1.query("SELECT count(*) from test_table_drop") == "7\n" + assert_eq_with_retry(node2, "SELECT count(*) from test_table_drop", "7") + + +node3 = cluster.add_instance('node3', main_configs=['configs/listen_host.xml'], + with_zookeeper=True, ipv6_address='2001:3984:3989::1:1113') +node4 = cluster.add_instance('node4', main_configs=['configs/remote_servers.xml', 'configs/listen_host.xml', 'configs/dns_update_short.xml'], + with_zookeeper=True, ipv6_address='2001:3984:3989::1:1114') + +@pytest.fixture(scope="module") +def cluster_with_dns_cache_update(): + try: + cluster.start() + + _fill_nodes([node3, node4], 'test_table_update') + + yield cluster + + except Exception as ex: + print ex + + finally: + cluster.shutdown() + pass + +# node3 is a source, node4 downloads data +# node4 has short dns_cache_update_period, so testing update of dns cache +def test_ip_change_update_dns_cache(cluster_with_dns_cache_update): + # First we check, that normal replication works + node3.query("INSERT INTO test_table_update VALUES ('2018-10-01', 1), ('2018-10-02', 2), ('2018-10-03', 3)") + assert node3.query("SELECT count(*) from test_table_update") == "3\n" + assert_eq_with_retry(node4, "SELECT count(*) from test_table_update", "3") + + # We change source node ip + cluster.restart_instance_with_ip_change(node3, "2001:3984:3989::1:8888") + + # Put some data to source node3 + node3.query("INSERT INTO test_table_update VALUES ('2018-10-01', 5), ('2018-10-02', 6), ('2018-10-03', 7)") + # Check that data is placed on node3 + assert node3.query("SELECT count(*) from test_table_update") == "6\n" + + # Because of DNS cache update, ip of node3 would be updated + assert_eq_with_retry(node4, "SELECT count(*) from test_table_update", "6") + + # Just to be sure check one more time + node3.query("INSERT INTO test_table_update VALUES ('2018-10-01', 8)") + assert node3.query("SELECT count(*) from test_table_update") == "7\n" + assert_eq_with_retry(node4, "SELECT count(*) from test_table_update", "7") + +def test_dns_cache_update(cluster_with_dns_cache_update): + node4.exec_in_container(['bash', '-c', 'echo 127.0.0.1 localhost > /etc/hosts'], privileged=True, user='root') + node4.exec_in_container(['bash', '-c', 'echo ::1 localhost >> /etc/hosts'], privileged=True, user='root') + node4.exec_in_container(['bash', '-c', 'echo 127.255.255.255 lost_host >> /etc/hosts'], privileged=True, user='root') + + with pytest.raises(QueryRuntimeException): + node4.query("SELECT * FROM remote('lost_host', 'system', 'one')") + + node4.query("CREATE TABLE distributed_lost_host (dummy UInt8) ENGINE = Distributed(lost_host_cluster, 'system', 'one')") + with pytest.raises(QueryRuntimeException): + node4.query("SELECT * FROM distributed_lost_host") + + node4.exec_in_container(['bash', '-c', 'echo 127.0.0.1 localhost > /etc/hosts'], privileged=True, user='root') + node4.exec_in_container(['bash', '-c', 'echo ::1 localhost >> /etc/hosts'], privileged=True, user='root') + node4.exec_in_container(['bash', '-c', 'echo 127.0.0.1 lost_host >> /etc/hosts'], privileged=True, user='root') + + # Wait a bit until dns cache will be updated + assert_eq_with_retry(node4, "SELECT * FROM remote('lost_host', 'system', 'one')", "0") + assert_eq_with_retry(node4, "SELECT * FROM distributed_lost_host", "0") + + assert TSV(node4.query("SELECT DISTINCT host_name, host_address FROM system.clusters WHERE cluster='lost_host_cluster'")) == TSV("lost_host\t127.0.0.1\n") + assert TSV(node4.query("SELECT hostName()")) == TSV("node4") diff --git a/libs/libcommon/include/common/SimpleCache.h b/libs/libcommon/include/common/SimpleCache.h index 06d29c8ba7a..2cf4348d0d7 100644 --- a/libs/libcommon/include/common/SimpleCache.h +++ b/libs/libcommon/include/common/SimpleCache.h @@ -9,6 +9,7 @@ /** The simplest cache for a free function. * You can also pass a static class method or lambda without captures. * The size is unlimited. Values are stored permanently and never evicted. + * But single record or all cache can be manually dropped. * Mutex is used for synchronization. * Suitable only for the simplest cases. * @@ -53,6 +54,18 @@ public: return res; } + template + void update(Args &&... args) + { + Result res = f(std::forward(args)...); + { + std::lock_guard lock(mutex); + + Key key{std::forward(args)...}; + cache[key] = std::move(res); + } + } + void drop() { std::lock_guard lock(mutex);