DNS cache: Add option to drop elements after several consecutive failures

This commit is contained in:
Raúl Marín 2022-04-05 13:00:14 +02:00
parent 12ac4cc731
commit 8a05cf3927
14 changed files with 103 additions and 32 deletions

View File

@ -1467,6 +1467,18 @@ The update is performed asynchronously, in a separate system thread.
- [background_schedule_pool_size](../../operations/settings/settings.md#background_schedule_pool_size)
## dns_max_consecutive_failures {#server-settings-dns-max-consecutive-failures}
The number of consecutive failures accepted when updating a DNS cache entry before it is dropped.
Use `0` to disable cache dropping (entries will only be cleaned by `SYSTEM DROP DNS CACHE`)
**Default value**: 5.
**See also**
- [`SYSTEM DROP DNS CACHE`](../../sql-reference/statements/system.md#query_language-system-drop-dns-cache)
## distributed_ddl {#server-settings-distributed_ddl}
Manage executing [distributed ddl queries](../../sql-reference/distributed-ddl.md) (CREATE, DROP, ALTER, RENAME) on cluster.

View File

@ -1503,7 +1503,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
else
{
/// Initialize a watcher periodically updating DNS cache
dns_cache_updater = std::make_unique<DNSCacheUpdater>(global_context, config().getInt("dns_cache_update_period", 15));
dns_cache_updater = std::make_unique<DNSCacheUpdater>(
global_context, config().getInt("dns_cache_update_period", 15), config().getUInt("dns_max_consecutive_failures", 5));
}
#if defined(OS_LINUX)

View File

@ -118,12 +118,12 @@ static DNSResolver::IPAddresses resolveIPAddressImpl(const std::string & host)
}
catch (const Poco::Net::DNSException & e)
{
LOG_ERROR(&Poco::Logger::get("DNSResolver"), "Cannot resolve host ({}), error {}: {}.", host, e.code(), e.message());
LOG_ERROR(&Poco::Logger::get("DNSResolver"), "Cannot resolve host ({}), error {}: {}.", host, e.code(), e.name());
addresses.clear();
}
if (addresses.empty())
throw Exception("Not found address of host: " + host, ErrorCodes::DNS_ERROR);
throw Poco::Net::DNSException("Not found address of host: " + host);
return addresses;
}
@ -142,6 +142,9 @@ static String reverseResolveImpl(const Poco::Net::IPAddress & address)
struct DNSResolver::Impl
{
using HostWithConsecutiveFailures = std::unordered_map<String, UInt8>;
using AddressWithConsecutiveFailures = std::unordered_map<Poco::Net::IPAddress, UInt8>;
CachedFn<&resolveIPAddressImpl> cache_host;
CachedFn<&reverseResolveImpl> cache_address;
@ -152,12 +155,12 @@ struct DNSResolver::Impl
std::optional<String> host_name;
/// Store hosts, which was asked to resolve from last update of DNS cache.
NameSet new_hosts;
std::unordered_set<Poco::Net::IPAddress> new_addresses;
HostWithConsecutiveFailures new_hosts;
AddressWithConsecutiveFailures new_addresses;
/// Store all hosts, which was whenever asked to resolve
NameSet known_hosts;
std::unordered_set<Poco::Net::IPAddress> known_addresses;
HostWithConsecutiveFailures known_hosts;
AddressWithConsecutiveFailures known_addresses;
/// If disabled, will not make cache lookups, will resolve addresses manually on each call
std::atomic<bool> disable_cache{false};
@ -247,37 +250,63 @@ static const String & cacheElemToString(const String & str) { return str; }
static String cacheElemToString(const Poco::Net::IPAddress & addr) { return addr.toString(); }
template <typename UpdateF, typename ElemsT>
bool DNSResolver::updateCacheImpl(UpdateF && update_func, ElemsT && elems, const String & log_msg)
bool DNSResolver::updateCacheImpl(
UpdateF && update_func,
ElemsT && elems,
UInt32 max_consecutive_failures,
const String & notfound_log_msg,
const String & dropped_log_msg)
{
bool updated = false;
String lost_elems;
for (const auto & elem : elems)
using iterators = typename std::remove_reference_t<decltype(elems)>::iterator;
std::vector<iterators> elements_to_drop;
for (auto it = elems.begin(); it != elems.end(); it++)
{
try
{
updated |= (this->*update_func)(elem);
updated |= (this->*update_func)(it->first);
it->second = 0;
}
catch (const Poco::Net::NetException &)
{
ProfileEvents::increment(ProfileEvents::DNSError);
if (!lost_elems.empty())
lost_elems += ", ";
lost_elems += cacheElemToString(elem);
lost_elems += cacheElemToString(it->first);
if (max_consecutive_failures)
{
it->second++;
if (it->second >= max_consecutive_failures)
elements_to_drop.emplace_back(it);
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}
if (!lost_elems.empty())
LOG_INFO(log, fmt::runtime(log_msg), lost_elems);
LOG_INFO(log, fmt::runtime(notfound_log_msg), lost_elems);
if (elements_to_drop.size())
{
updated = true;
String deleted_elements;
for (auto it : elements_to_drop)
{
if (!deleted_elements.empty())
deleted_elements += ", ";
deleted_elements += cacheElemToString(it->first);
elems.erase(it);
}
LOG_INFO(log, fmt::runtime(dropped_log_msg), deleted_elements);
}
return updated;
}
bool DNSResolver::updateCache()
bool DNSResolver::updateCache(UInt32 max_consecutive_failures)
{
LOG_DEBUG(log, "Updating DNS cache");
@ -301,8 +330,14 @@ bool DNSResolver::updateCache()
/// DROP DNS CACHE will wait on update_mutex (possibly while holding drop_mutex)
std::lock_guard lock(impl->update_mutex);
bool hosts_updated = updateCacheImpl(&DNSResolver::updateHost, impl->known_hosts, "Cached hosts not found: {}");
updateCacheImpl(&DNSResolver::updateAddress, impl->known_addresses, "Cached addresses not found: {}");
bool hosts_updated = updateCacheImpl(
&DNSResolver::updateHost, impl->known_hosts, max_consecutive_failures, "Cached hosts not found: {}", "Cached hosts dropped: {}");
updateCacheImpl(
&DNSResolver::updateAddress,
impl->known_addresses,
max_consecutive_failures,
"Cached addresses not found: {}",
"Cached addresses dropped: {}");
LOG_DEBUG(log, "Updated DNS cache");
return hosts_updated;
@ -326,13 +361,15 @@ bool DNSResolver::updateAddress(const Poco::Net::IPAddress & address)
void DNSResolver::addToNewHosts(const String & host)
{
std::lock_guard lock(impl->drop_mutex);
impl->new_hosts.insert(host);
UInt8 consecutive_failures = 0;
impl->new_hosts.insert({host, consecutive_failures});
}
void DNSResolver::addToNewAddresses(const Poco::Net::IPAddress & address)
{
std::lock_guard lock(impl->drop_mutex);
impl->new_addresses.insert(address);
UInt8 consecutive_failures = 0;
impl->new_addresses.insert({address, consecutive_failures});
}
DNSResolver::~DNSResolver() = default;

View File

@ -47,14 +47,20 @@ public:
void dropCache();
/// Updates all known hosts in cache.
/// Returns true if IP of any host has been changed.
bool updateCache();
/// Returns true if IP of any host has been changed or an element was dropped (too many failures)
bool updateCache(UInt32 max_consecutive_failures);
~DNSResolver();
private:
template <typename UpdateF, typename ElemsT>
bool updateCacheImpl(UpdateF && update_func, ElemsT && elems, const String & log_msg);
bool updateCacheImpl(
UpdateF && update_func,
ElemsT && elems,
UInt32 max_consecutive_failures,
const String & notfound_log_msg,
const String & dropped_log_msg);
DNSResolver();

View File

@ -7,9 +7,10 @@
namespace DB
{
DNSCacheUpdater::DNSCacheUpdater(ContextPtr context_, Int32 update_period_seconds_)
DNSCacheUpdater::DNSCacheUpdater(ContextPtr context_, Int32 update_period_seconds_, UInt32 max_consecutive_failures_)
: WithContext(context_)
, update_period_seconds(update_period_seconds_)
, max_consecutive_failures(max_consecutive_failures_)
, pool(getContext()->getSchedulePool())
{
task_handle = pool.createTask("DNSCacheUpdater", [this]{ run(); });
@ -20,7 +21,7 @@ void DNSCacheUpdater::run()
auto & resolver = DNSResolver::instance();
/// Reload cluster config if IP of any host has been changed since last update.
if (resolver.updateCache())
if (resolver.updateCache(max_consecutive_failures))
{
LOG_INFO(&Poco::Logger::get("DNSCacheUpdater"), "IPs of some hosts have been changed. Will reload cluster config.");
try

View File

@ -10,7 +10,7 @@ namespace DB
class DNSCacheUpdater : WithContext
{
public:
DNSCacheUpdater(ContextPtr context, Int32 update_period_seconds_);
DNSCacheUpdater(ContextPtr context, Int32 update_period_seconds_, UInt32 max_consecutive_failures);
~DNSCacheUpdater();
void start();
@ -18,6 +18,7 @@ private:
void run();
Int32 update_period_seconds;
UInt32 max_consecutive_failures;
BackgroundSchedulePool & pool;
BackgroundSchedulePoolTaskHolder task_handle;

View File

@ -1,7 +1,8 @@
[
"test_host_ip_change/test.py::test_dns_cache_update",
"test_host_ip_change/test.py::test_ip_change_drop_dns_cache",
"test_host_ip_change/test.py::test_ip_change_update_dns_cache",
"test_host_ip_change/test.py::test_user_access_ip_change[node0]",
"test_host_ip_change/test.py::test_user_access_ip_change[node1]"
"test_dns_cache/test.py::test_dns_cache_update",
"test_dns_cache/test.py::test_ip_change_drop_dns_cache",
"test_dns_cache/test.py::test_ip_change_update_dns_cache",
"test_dns_cache/test.py::test_user_access_ip_change[node0]",
"test_dns_cache/test.py::test_user_access_ip_change[node1]",
"test_dns_cache/test.py::test_host_is_drop_from_cache_after_consecutive_failures"
]

View File

@ -1,3 +1,4 @@
<clickhouse>
<dns_cache_update_period>1</dns_cache_update_period>
<dns_max_consecutive_failures>6</dns_max_consecutive_failures>
</clickhouse>

View File

@ -285,3 +285,14 @@ def test_user_access_ip_change(cluster_with_dns_cache_update, node):
retry_count=retry_count,
sleep_time=1,
)
def test_host_is_drop_from_cache_after_consecutive_failures(cluster_with_dns_cache_update):
with pytest.raises(QueryRuntimeException):
node4.query("SELECT * FROM remote('InvalidHostThatDoesNotExist', 'system', 'one')")
# Note that the list of hosts in variable since lost_host will be there too (and it's dropped and added back)
# dns_update_short -> dns_max_consecutive_failures set to 6
assert node4.wait_for_log_line("Cannot resolve host \\(InvalidHostThatDoesNotExist\\), error 0: Host not found.")
assert node4.wait_for_log_line("Cached hosts not found:.*InvalidHostThatDoesNotExist**", repetitions=6)
assert node4.wait_for_log_line("Cached hosts dropped:.*InvalidHostThatDoesNotExist.*")