Merge pull request #5857 from CurtizJ/dns-cache

Implement dns cache with asynchronous update.
This commit is contained in:
alexey-milovidov 2019-07-05 15:56:26 +03:00 committed by GitHub
commit 2469ec1af3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 266 additions and 132 deletions

View File

@ -525,8 +525,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
else
{
/// Initialize a watcher updating DNS cache in case of network errors
dns_cache_updater = std::make_unique<DNSCacheUpdater>(*global_context);
/// Initialize a watcher periodically updating DNS cache
dns_cache_updater = std::make_unique<DNSCacheUpdater>(*global_context, config().getInt("dns_cache_update_period", 15));
}
#if defined(__linux__)
@ -773,6 +773,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
main_config_reloader->start();
users_config_reloader->start();
if (dns_cache_updater)
dns_cache_updater->start();
{
std::stringstream message;
@ -823,6 +825,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
log, "Closed connections." << (current_connections ? " But " + toString(current_connections) + " remains."
" Tip: To increase wait time add to config: <shutdown_wait_unfinished>60</shutdown_wait_unfinished>" : ""));
dns_cache_updater.reset();
main_config_reloader.reset();
users_config_reloader.reset();
});

View File

@ -1,14 +1,23 @@
#include "DNSResolver.h"
#include <common/SimpleCache.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Core/Names.h>
#include <Core/Types.h>
#include <Poco/Net/DNS.h>
#include <Poco/Net/NetException.h>
#include <Poco/NumberParser.h>
#include <Poco/Logger.h>
#include <common/logger_useful.h>
#include <arpa/inet.h>
#include <atomic>
#include <optional>
namespace ProfileEvents
{
extern Event NetworkErrors;
}
namespace DB
{
@ -67,7 +76,6 @@ static void splitHostAndPort(const std::string & host_and_port, std::string & ou
}
}
static Poco::Net::IPAddress resolveIPAddressImpl(const std::string & host)
{
/// NOTE: Poco::Net::DNS::resolveOne(host) doesn't work for IP addresses like 127.0.0.2
@ -75,15 +83,22 @@ static Poco::Net::IPAddress resolveIPAddressImpl(const std::string & host)
return Poco::Net::SocketAddress(host, 0U).host();
}
struct DNSResolver::Impl
{
SimpleCache<decltype(resolveIPAddressImpl), &resolveIPAddressImpl> cache_host;
std::mutex drop_mutex;
std::mutex update_mutex;
/// Cached server host name
std::mutex mutex;
std::optional<String> host_name;
/// Store hosts, which was asked to resolve from last update of DNS cache.
NameSet new_hosts;
/// Store all hosts, which was whenever asked to resolve
NameSet known_hosts;
/// If disabled, will not make cache lookups, will resolve addresses manually on each call
std::atomic<bool> disable_cache{false};
};
@ -93,28 +108,43 @@ DNSResolver::DNSResolver() : impl(std::make_unique<DNSResolver::Impl>()) {}
Poco::Net::IPAddress DNSResolver::resolveHost(const std::string & host)
{
return !impl->disable_cache ? impl->cache_host(host) : resolveIPAddressImpl(host);
if (impl->disable_cache)
return resolveIPAddressImpl(host);
addToNewHosts(host);
return impl->cache_host(host);
}
Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host_and_port)
{
if (impl->disable_cache)
return Poco::Net::SocketAddress(host_and_port);
String host;
UInt16 port;
splitHostAndPort(host_and_port, host, port);
return !impl->disable_cache ? Poco::Net::SocketAddress(impl->cache_host(host), port) : Poco::Net::SocketAddress(host_and_port);
addToNewHosts(host);
return Poco::Net::SocketAddress(impl->cache_host(host), port);
}
Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host, UInt16 port)
{
return !impl->disable_cache ? Poco::Net::SocketAddress(impl->cache_host(host), port) : Poco::Net::SocketAddress(host, port);
if (impl->disable_cache)
return Poco::Net::SocketAddress(host, port);
addToNewHosts(host);
return Poco::Net::SocketAddress(impl->cache_host(host), port);
}
void DNSResolver::dropCache()
{
impl->cache_host.drop();
std::unique_lock lock(impl->mutex);
std::scoped_lock lock(impl->update_mutex, impl->drop_mutex);
impl->known_hosts.clear();
impl->new_hosts.clear();
impl->host_name.reset();
}
@ -128,7 +158,7 @@ String DNSResolver::getHostName()
if (impl->disable_cache)
return Poco::Net::DNS::hostName();
std::unique_lock lock(impl->mutex);
std::lock_guard lock(impl->drop_mutex);
if (!impl->host_name.has_value())
impl->host_name.emplace(Poco::Net::DNS::hostName());
@ -136,6 +166,61 @@ String DNSResolver::getHostName()
return *impl->host_name;
}
bool DNSResolver::updateCache()
{
{
std::lock_guard lock(impl->drop_mutex);
for (auto & host : impl->new_hosts)
impl->known_hosts.insert(std::move(host));
impl->new_hosts.clear();
impl->host_name.emplace(Poco::Net::DNS::hostName());
}
std::lock_guard lock(impl->update_mutex);
bool updated = false;
String lost_hosts;
for (const auto & host : impl->known_hosts)
{
try
{
updated |= updateHost(host);
}
catch (const Poco::Net::NetException &)
{
ProfileEvents::increment(ProfileEvents::NetworkErrors);
if (!lost_hosts.empty())
lost_hosts += ", ";
lost_hosts += host;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
if (!lost_hosts.empty())
LOG_INFO(&Logger::get("DNSResolver"), "Cached hosts not found: " << lost_hosts);
return updated;
}
bool DNSResolver::updateHost(const String & host)
{
/// Usage of updateHost implies that host is already in cache and there is no extra computations
auto old_value = impl->cache_host(host);
impl->cache_host.update(host);
return old_value != impl->cache_host(host);
}
void DNSResolver::addToNewHosts(const String & host)
{
std::lock_guard lock(impl->drop_mutex);
impl->new_hosts.insert(host);
}
DNSResolver::~DNSResolver() = default;

View File

@ -4,13 +4,15 @@
#include <memory>
#include <ext/singleton.h>
#include <Core/Types.h>
#include <Core/Names.h>
namespace DB
{
/// A singleton implementing DNS names resolving with optional permanent DNS cache
/// The cache could be updated only manually via drop() method
/// A singleton implementing DNS names resolving with optional DNS cache
/// The cache is being updated asynchronous in separate thread (see DNSCacheUpdater)
/// or it could be updated manually via drop() method.
class DNSResolver : public ext::singleton<DNSResolver>
{
public:
@ -34,9 +36,13 @@ public:
/// Drops all caches
void dropCache();
/// Updates all known hosts in cache.
/// Returns true if IP of any host has been changed.
bool updateCache();
~DNSResolver();
protected:
private:
DNSResolver();
@ -44,6 +50,11 @@ protected:
struct Impl;
std::unique_ptr<Impl> impl;
/// Returns true if IP of host has been changed.
bool updateHost(const String & host);
void addToNewHosts(const String & host);
};
}

View File

@ -2,78 +2,31 @@
#include <Common/DNSResolver.h>
#include <Interpreters/Context.h>
#include <Core/BackgroundSchedulePool.h>
#include <Common/ProfileEvents.h>
#include <Poco/Net/NetException.h>
#include <common/logger_useful.h>
namespace ProfileEvents
{
extern Event NetworkErrors;
}
namespace DB
{
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
extern const int ALL_CONNECTION_TRIES_FAILED;
}
/// Call it inside catch section
/// Returns true if it is a network error
static bool isNetworkError()
{
try
{
throw;
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::TIMEOUT_EXCEEDED || e.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
return true;
}
catch (Poco::Net::DNSException &)
{
return true;
}
catch (Poco::TimeoutException &)
{
return true;
}
catch (...)
{
/// Do nothing
}
return false;
}
DNSCacheUpdater::DNSCacheUpdater(Context & context_)
: context(context_), pool(context_.getSchedulePool())
DNSCacheUpdater::DNSCacheUpdater(Context & context_, Int32 update_period_seconds_)
: context(context_),
update_period_seconds(update_period_seconds_),
pool(context_.getSchedulePool())
{
task_handle = pool.createTask("DNSCacheUpdater", [this]{ run(); });
}
void DNSCacheUpdater::run()
{
auto num_current_network_exceptions = ProfileEvents::global_counters[ProfileEvents::NetworkErrors].load(std::memory_order_relaxed);
if (num_current_network_exceptions >= last_num_network_erros + min_errors_to_update_cache)
watch.restart();
auto & resolver = DNSResolver::instance();
/// Reload cluster config if IP of any host has been changed since last update.
if (resolver.updateCache())
{
LOG_INFO(&Poco::Logger::get("DNSCacheUpdater"),
"IPs of some hosts have been changed. Will reload cluster config.");
try
{
LOG_INFO(&Poco::Logger::get("DNSCacheUpdater"), "Updating DNS cache");
DNSResolver::instance().dropCache();
context.reloadClusterConfig();
last_num_network_erros = num_current_network_exceptions;
task_handle->scheduleAfter(min_update_period_seconds * 1000);
return;
}
catch (...)
{
@ -81,19 +34,18 @@ void DNSCacheUpdater::run()
}
}
task_handle->scheduleAfter(10 * 1000);
auto interval_ms = std::max(0, update_period_seconds * 1000 - static_cast<Int32>(watch.elapsedMilliseconds()));
task_handle->scheduleAfter(interval_ms);
}
bool DNSCacheUpdater::incrementNetworkErrorEventsIfNeeded()
void DNSCacheUpdater::start()
{
if (isNetworkError())
{
ProfileEvents::increment(ProfileEvents::NetworkErrors);
return true;
}
task_handle->activateAndSchedule();
}
return false;
DNSCacheUpdater::~DNSCacheUpdater()
{
task_handle->deactivate();
}
}

View File

@ -1,10 +1,6 @@
#pragma once
#include <memory>
#include <ctime>
#include <cstddef>
#include <Core/BackgroundSchedulePool.h>
#include <Common/Stopwatch.h>
namespace DB
@ -12,26 +8,23 @@ namespace DB
class Context;
/// Add a task to BackgroundProcessingPool that watch for ProfileEvents::NetworkErrors and updates DNS cache if it has increased
/// Add a task to BackgroundProcessingPool that resolves all hosts and updates cache with constant period.
class DNSCacheUpdater
{
public:
explicit DNSCacheUpdater(Context & context);
/// Checks if it is a network error and increments ProfileEvents::NetworkErrors
static bool incrementNetworkErrorEventsIfNeeded();
explicit DNSCacheUpdater(Context & context, Int32 update_period_seconds_);
~DNSCacheUpdater();
void start();
private:
void run();
Context & context;
Int32 update_period_seconds;
BackgroundSchedulePool & pool;
BackgroundSchedulePoolTaskHolder task_handle;
size_t last_num_network_erros = 0;
static constexpr size_t min_errors_to_update_cache = 3;
static constexpr time_t min_update_period_seconds = 45;
Stopwatch watch;
};

View File

@ -431,8 +431,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (!internal)
onExceptionBeforeStart(query, context, current_time);
DNSCacheUpdater::incrementNetworkErrorEventsIfNeeded();
throw;
}

View File

@ -206,7 +206,6 @@ void BackgroundProcessingPool::threadFunction()
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
DNSCacheUpdater::incrementNetworkErrorEventsIfNeeded();
}
if (shutdown)

View File

@ -0,0 +1,3 @@
<yandex>
<dns_cache_update_period>200</dns_cache_update_period>
</yandex>

View File

@ -0,0 +1,3 @@
<yandex>
<dns_cache_update_period>2</dns_cache_update_period>
</yandex>

View File

@ -1,14 +1,13 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<default_database>shard_0</default_database>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
<lost_host_cluster>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>lost_host</host>
<port>9000</port>
</replica>
</shard>
</lost_host_cluster>
</remote_servers>
</yandex>

View File

@ -4,28 +4,33 @@ import pytest
import subprocess
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
from helpers.client import QueryRuntimeException
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml', 'configs/listen_host.xml'], with_zookeeper=True, ipv6_address='2001:3984:3989::1:1111')
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml', 'configs/listen_host.xml'], with_zookeeper=True, ipv6_address='2001:3984:3989::1:1112')
def _fill_nodes(nodes, table_name):
for node in nodes:
node.query(
'''
CREATE DATABASE IF NOT EXISTS test;
CREATE TABLE IF NOT EXISTS {0}(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/{0}', '{1}')
ORDER BY id PARTITION BY toYYYYMM(date);
'''.format(table_name, node.name)
)
node1 = cluster.add_instance('node1', main_configs=['configs/listen_host.xml'], with_zookeeper=True, ipv6_address='2001:3984:3989::1:1111')
node2 = cluster.add_instance('node2', main_configs=['configs/listen_host.xml', 'configs/dns_update_long.xml'],
with_zookeeper=True, ipv6_address='2001:3984:3989::1:1112')
@pytest.fixture(scope="module")
def start_cluster():
def cluster_without_dns_cache_update():
try:
cluster.start()
for node in [node1, node2]:
node.query(
'''
CREATE DATABASE IF NOT EXISTS test;
CREATE TABLE IF NOT EXISTS test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/replicated', '{}')
ORDER BY id PARTITION BY toYYYYMM(date);
'''.format(node.name)
)
_fill_nodes([node1, node2], 'test_table_drop')
yield cluster
@ -36,31 +41,101 @@ def start_cluster():
cluster.shutdown()
pass
def test_merge_doesnt_work_without_zookeeper(start_cluster):
# node1 is a source, node2 downloads data
# node2 has long dns_cache_update_period, so dns cache update wouldn't work
def test_ip_change_drop_dns_cache(cluster_without_dns_cache_update):
# First we check, that normal replication works
node1.query("INSERT INTO test_table VALUES ('2018-10-01', 1), ('2018-10-02', 2), ('2018-10-03', 3)")
assert node1.query("SELECT count(*) from test_table") == "3\n"
assert_eq_with_retry(node2, "SELECT count(*) from test_table", "3")
node1.query("INSERT INTO test_table_drop VALUES ('2018-10-01', 1), ('2018-10-02', 2), ('2018-10-03', 3)")
assert node1.query("SELECT count(*) from test_table_drop") == "3\n"
assert_eq_with_retry(node2, "SELECT count(*) from test_table_drop", "3")
# We change source node ip
cluster.restart_instance_with_ip_change(node1, "2001:3984:3989::1:7777")
# Put some data to source node1
node1.query("INSERT INTO test_table VALUES ('2018-10-01', 5), ('2018-10-02', 6), ('2018-10-03', 7)")
node1.query("INSERT INTO test_table_drop VALUES ('2018-10-01', 5), ('2018-10-02', 6), ('2018-10-03', 7)")
# Check that data is placed on node1
assert node1.query("SELECT count(*) from test_table") == "6\n"
assert node1.query("SELECT count(*) from test_table_drop") == "6\n"
# Because of DNS cache dest node2 cannot download data from node1
with pytest.raises(Exception):
assert_eq_with_retry(node2, "SELECT count(*) from test_table", "6")
assert_eq_with_retry(node2, "SELECT count(*) from test_table_drop", "6")
# drop DNS cache
node2.query("SYSTEM DROP DNS CACHE")
# Data is downloaded
assert_eq_with_retry(node2, "SELECT count(*) from test_table", "6")
assert_eq_with_retry(node2, "SELECT count(*) from test_table_drop", "6")
# Just to be sure check one more time
node1.query("INSERT INTO test_table VALUES ('2018-10-01', 8)")
assert node1.query("SELECT count(*) from test_table") == "7\n"
assert_eq_with_retry(node2, "SELECT count(*) from test_table", "7")
node1.query("INSERT INTO test_table_drop VALUES ('2018-10-01', 8)")
assert node1.query("SELECT count(*) from test_table_drop") == "7\n"
assert_eq_with_retry(node2, "SELECT count(*) from test_table_drop", "7")
node3 = cluster.add_instance('node3', main_configs=['configs/listen_host.xml'],
with_zookeeper=True, ipv6_address='2001:3984:3989::1:1113')
node4 = cluster.add_instance('node4', main_configs=['configs/remote_servers.xml', 'configs/listen_host.xml', 'configs/dns_update_short.xml'],
with_zookeeper=True, ipv6_address='2001:3984:3989::1:1114')
@pytest.fixture(scope="module")
def cluster_with_dns_cache_update():
try:
cluster.start()
_fill_nodes([node3, node4], 'test_table_update')
yield cluster
except Exception as ex:
print ex
finally:
cluster.shutdown()
pass
# node3 is a source, node4 downloads data
# node4 has short dns_cache_update_period, so testing update of dns cache
def test_ip_change_update_dns_cache(cluster_with_dns_cache_update):
# First we check, that normal replication works
node3.query("INSERT INTO test_table_update VALUES ('2018-10-01', 1), ('2018-10-02', 2), ('2018-10-03', 3)")
assert node3.query("SELECT count(*) from test_table_update") == "3\n"
assert_eq_with_retry(node4, "SELECT count(*) from test_table_update", "3")
# We change source node ip
cluster.restart_instance_with_ip_change(node3, "2001:3984:3989::1:8888")
# Put some data to source node3
node3.query("INSERT INTO test_table_update VALUES ('2018-10-01', 5), ('2018-10-02', 6), ('2018-10-03', 7)")
# Check that data is placed on node3
assert node3.query("SELECT count(*) from test_table_update") == "6\n"
# Because of DNS cache update, ip of node3 would be updated
assert_eq_with_retry(node4, "SELECT count(*) from test_table_update", "6")
# Just to be sure check one more time
node3.query("INSERT INTO test_table_update VALUES ('2018-10-01', 8)")
assert node3.query("SELECT count(*) from test_table_update") == "7\n"
assert_eq_with_retry(node4, "SELECT count(*) from test_table_update", "7")
def test_dns_cache_update(cluster_with_dns_cache_update):
node4.exec_in_container(['bash', '-c', 'echo 127.0.0.1 localhost > /etc/hosts'], privileged=True, user='root')
node4.exec_in_container(['bash', '-c', 'echo ::1 localhost >> /etc/hosts'], privileged=True, user='root')
node4.exec_in_container(['bash', '-c', 'echo 127.255.255.255 lost_host >> /etc/hosts'], privileged=True, user='root')
with pytest.raises(QueryRuntimeException):
node4.query("SELECT * FROM remote('lost_host', 'system', 'one')")
node4.query("CREATE TABLE distributed_lost_host (dummy UInt8) ENGINE = Distributed(lost_host_cluster, 'system', 'one')")
with pytest.raises(QueryRuntimeException):
node4.query("SELECT * FROM distributed_lost_host")
node4.exec_in_container(['bash', '-c', 'echo 127.0.0.1 localhost > /etc/hosts'], privileged=True, user='root')
node4.exec_in_container(['bash', '-c', 'echo ::1 localhost >> /etc/hosts'], privileged=True, user='root')
node4.exec_in_container(['bash', '-c', 'echo 127.0.0.1 lost_host >> /etc/hosts'], privileged=True, user='root')
# Wait a bit until dns cache will be updated
assert_eq_with_retry(node4, "SELECT * FROM remote('lost_host', 'system', 'one')", "0")
assert_eq_with_retry(node4, "SELECT * FROM distributed_lost_host", "0")
assert TSV(node4.query("SELECT DISTINCT host_name, host_address FROM system.clusters WHERE cluster='lost_host_cluster'")) == TSV("lost_host\t127.0.0.1\n")
assert TSV(node4.query("SELECT hostName()")) == TSV("node4")

View File

@ -9,6 +9,7 @@
/** The simplest cache for a free function.
* You can also pass a static class method or lambda without captures.
* The size is unlimited. Values are stored permanently and never evicted.
* But single record or all cache can be manually dropped.
* Mutex is used for synchronization.
* Suitable only for the simplest cases.
*
@ -53,6 +54,18 @@ public:
return res;
}
template <typename... Args>
void update(Args &&... args)
{
Result res = f(std::forward<Args>(args)...);
{
std::lock_guard lock(mutex);
Key key{std::forward<Args>(args)...};
cache[key] = std::move(res);
}
}
void drop()
{
std::lock_guard lock(mutex);