Merge pull request #35956 from Algunenano/limit_dns_retries

DNS cache: Add option to drop elements after several consecutive failures
This commit is contained in:
tavplubix 2022-04-07 13:20:36 +03:00 committed by GitHub
commit 6e9c028bbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 140 additions and 33 deletions

View File

@ -1467,6 +1467,18 @@ The update is performed asynchronously, in a separate system thread.
- [background_schedule_pool_size](../../operations/settings/settings.md#background_schedule_pool_size) - [background_schedule_pool_size](../../operations/settings/settings.md#background_schedule_pool_size)
## dns_max_consecutive_failures {#server-settings-dns-max-consecutive-failures}
The number of consecutive failures accepted when updating a DNS cache entry before it is dropped.
Use `0` to disable cache dropping (entries will only be cleaned by `SYSTEM DROP DNS CACHE`)
**Default value**: 5.
**See also**
- [`SYSTEM DROP DNS CACHE`](../../sql-reference/statements/system.md#query_language-system-drop-dns-cache)
## distributed_ddl {#server-settings-distributed_ddl} ## distributed_ddl {#server-settings-distributed_ddl}
Manage executing [distributed ddl queries](../../sql-reference/distributed-ddl.md) (CREATE, DROP, ALTER, RENAME) on cluster. Manage executing [distributed ddl queries](../../sql-reference/distributed-ddl.md) (CREATE, DROP, ALTER, RENAME) on cluster.

View File

@ -1503,7 +1503,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
else else
{ {
/// Initialize a watcher periodically updating DNS cache /// Initialize a watcher periodically updating DNS cache
dns_cache_updater = std::make_unique<DNSCacheUpdater>(global_context, config().getInt("dns_cache_update_period", 15)); dns_cache_updater = std::make_unique<DNSCacheUpdater>(
global_context, config().getInt("dns_cache_update_period", 15), config().getUInt("dns_max_consecutive_failures", 5));
} }
#if defined(OS_LINUX) #if defined(OS_LINUX)

View File

@ -118,12 +118,15 @@ static DNSResolver::IPAddresses resolveIPAddressImpl(const std::string & host)
} }
catch (const Poco::Net::DNSException & e) catch (const Poco::Net::DNSException & e)
{ {
LOG_ERROR(&Poco::Logger::get("DNSResolver"), "Cannot resolve host ({}), error {}: {}.", host, e.code(), e.message()); LOG_ERROR(&Poco::Logger::get("DNSResolver"), "Cannot resolve host ({}), error {}: {}.", host, e.code(), e.name());
addresses.clear(); addresses.clear();
} }
if (addresses.empty()) if (addresses.empty())
{
ProfileEvents::increment(ProfileEvents::DNSError);
throw Exception("Not found address of host: " + host, ErrorCodes::DNS_ERROR); throw Exception("Not found address of host: " + host, ErrorCodes::DNS_ERROR);
}
return addresses; return addresses;
} }
@ -142,6 +145,9 @@ static String reverseResolveImpl(const Poco::Net::IPAddress & address)
struct DNSResolver::Impl struct DNSResolver::Impl
{ {
using HostWithConsecutiveFailures = std::unordered_map<String, UInt32>;
using AddressWithConsecutiveFailures = std::unordered_map<Poco::Net::IPAddress, UInt32>;
CachedFn<&resolveIPAddressImpl> cache_host; CachedFn<&resolveIPAddressImpl> cache_host;
CachedFn<&reverseResolveImpl> cache_address; CachedFn<&reverseResolveImpl> cache_address;
@ -152,12 +158,12 @@ struct DNSResolver::Impl
std::optional<String> host_name; std::optional<String> host_name;
/// Store hosts, which was asked to resolve from last update of DNS cache. /// Store hosts, which was asked to resolve from last update of DNS cache.
NameSet new_hosts; HostWithConsecutiveFailures new_hosts;
std::unordered_set<Poco::Net::IPAddress> new_addresses; AddressWithConsecutiveFailures new_addresses;
/// Store all hosts, which was whenever asked to resolve /// Store all hosts, which was whenever asked to resolve
NameSet known_hosts; HostWithConsecutiveFailures known_hosts;
std::unordered_set<Poco::Net::IPAddress> known_addresses; AddressWithConsecutiveFailures known_addresses;
/// If disabled, will not make cache lookups, will resolve addresses manually on each call /// If disabled, will not make cache lookups, will resolve addresses manually on each call
std::atomic<bool> disable_cache{false}; std::atomic<bool> disable_cache{false};
@ -247,37 +253,67 @@ static const String & cacheElemToString(const String & str) { return str; }
static String cacheElemToString(const Poco::Net::IPAddress & addr) { return addr.toString(); } static String cacheElemToString(const Poco::Net::IPAddress & addr) { return addr.toString(); }
template <typename UpdateF, typename ElemsT> template <typename UpdateF, typename ElemsT>
bool DNSResolver::updateCacheImpl(UpdateF && update_func, ElemsT && elems, const String & log_msg) bool DNSResolver::updateCacheImpl(
UpdateF && update_func,
ElemsT && elems,
UInt32 max_consecutive_failures,
const String & notfound_log_msg,
const String & dropped_log_msg)
{ {
bool updated = false; bool updated = false;
String lost_elems; String lost_elems;
for (const auto & elem : elems) using iterators = typename std::remove_reference_t<decltype(elems)>::iterator;
std::vector<iterators> elements_to_drop;
for (auto it = elems.begin(); it != elems.end(); it++)
{ {
try try
{ {
updated |= (this->*update_func)(elem); updated |= (this->*update_func)(it->first);
it->second = 0;
} }
catch (const Poco::Net::NetException &) catch (const DB::Exception & e)
{ {
ProfileEvents::increment(ProfileEvents::DNSError); if (e.code() != ErrorCodes::DNS_ERROR)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
continue;
}
if (!lost_elems.empty()) if (!lost_elems.empty())
lost_elems += ", "; lost_elems += ", ";
lost_elems += cacheElemToString(elem); lost_elems += cacheElemToString(it->first);
if (max_consecutive_failures)
{
it->second++;
if (it->second >= max_consecutive_failures)
elements_to_drop.emplace_back(it);
}
} }
catch (...) catch (...)
{ {
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(log, __PRETTY_FUNCTION__);
} }
} }
if (!lost_elems.empty()) if (!lost_elems.empty())
LOG_INFO(log, fmt::runtime(log_msg), lost_elems); LOG_INFO(log, fmt::runtime(notfound_log_msg), lost_elems);
if (elements_to_drop.size())
{
updated = true;
String deleted_elements;
for (auto it : elements_to_drop)
{
if (!deleted_elements.empty())
deleted_elements += ", ";
deleted_elements += cacheElemToString(it->first);
elems.erase(it);
}
LOG_INFO(log, fmt::runtime(dropped_log_msg), deleted_elements);
}
return updated; return updated;
} }
bool DNSResolver::updateCache() bool DNSResolver::updateCache(UInt32 max_consecutive_failures)
{ {
LOG_DEBUG(log, "Updating DNS cache"); LOG_DEBUG(log, "Updating DNS cache");
@ -301,8 +337,14 @@ bool DNSResolver::updateCache()
/// DROP DNS CACHE will wait on update_mutex (possibly while holding drop_mutex) /// DROP DNS CACHE will wait on update_mutex (possibly while holding drop_mutex)
std::lock_guard lock(impl->update_mutex); std::lock_guard lock(impl->update_mutex);
bool hosts_updated = updateCacheImpl(&DNSResolver::updateHost, impl->known_hosts, "Cached hosts not found: {}"); bool hosts_updated = updateCacheImpl(
updateCacheImpl(&DNSResolver::updateAddress, impl->known_addresses, "Cached addresses not found: {}"); &DNSResolver::updateHost, impl->known_hosts, max_consecutive_failures, "Cached hosts not found: {}", "Cached hosts dropped: {}");
updateCacheImpl(
&DNSResolver::updateAddress,
impl->known_addresses,
max_consecutive_failures,
"Cached addresses not found: {}",
"Cached addresses dropped: {}");
LOG_DEBUG(log, "Updated DNS cache"); LOG_DEBUG(log, "Updated DNS cache");
return hosts_updated; return hosts_updated;
@ -326,13 +368,15 @@ bool DNSResolver::updateAddress(const Poco::Net::IPAddress & address)
void DNSResolver::addToNewHosts(const String & host) void DNSResolver::addToNewHosts(const String & host)
{ {
std::lock_guard lock(impl->drop_mutex); std::lock_guard lock(impl->drop_mutex);
impl->new_hosts.insert(host); UInt8 consecutive_failures = 0;
impl->new_hosts.insert({host, consecutive_failures});
} }
void DNSResolver::addToNewAddresses(const Poco::Net::IPAddress & address) void DNSResolver::addToNewAddresses(const Poco::Net::IPAddress & address)
{ {
std::lock_guard lock(impl->drop_mutex); std::lock_guard lock(impl->drop_mutex);
impl->new_addresses.insert(address); UInt8 consecutive_failures = 0;
impl->new_addresses.insert({address, consecutive_failures});
} }
DNSResolver::~DNSResolver() = default; DNSResolver::~DNSResolver() = default;

View File

@ -47,14 +47,20 @@ public:
void dropCache(); void dropCache();
/// Updates all known hosts in cache. /// Updates all known hosts in cache.
/// Returns true if IP of any host has been changed. /// Returns true if IP of any host has been changed or an element was dropped (too many failures)
bool updateCache(); bool updateCache(UInt32 max_consecutive_failures);
~DNSResolver(); ~DNSResolver();
private: private:
template <typename UpdateF, typename ElemsT> template <typename UpdateF, typename ElemsT>
bool updateCacheImpl(UpdateF && update_func, ElemsT && elems, const String & log_msg);
bool updateCacheImpl(
UpdateF && update_func,
ElemsT && elems,
UInt32 max_consecutive_failures,
const String & notfound_log_msg,
const String & dropped_log_msg);
DNSResolver(); DNSResolver();

View File

@ -7,9 +7,10 @@
namespace DB namespace DB
{ {
DNSCacheUpdater::DNSCacheUpdater(ContextPtr context_, Int32 update_period_seconds_) DNSCacheUpdater::DNSCacheUpdater(ContextPtr context_, Int32 update_period_seconds_, UInt32 max_consecutive_failures_)
: WithContext(context_) : WithContext(context_)
, update_period_seconds(update_period_seconds_) , update_period_seconds(update_period_seconds_)
, max_consecutive_failures(max_consecutive_failures_)
, pool(getContext()->getSchedulePool()) , pool(getContext()->getSchedulePool())
{ {
task_handle = pool.createTask("DNSCacheUpdater", [this]{ run(); }); task_handle = pool.createTask("DNSCacheUpdater", [this]{ run(); });
@ -20,7 +21,7 @@ void DNSCacheUpdater::run()
auto & resolver = DNSResolver::instance(); auto & resolver = DNSResolver::instance();
/// Reload cluster config if IP of any host has been changed since last update. /// Reload cluster config if IP of any host has been changed since last update.
if (resolver.updateCache()) if (resolver.updateCache(max_consecutive_failures))
{ {
LOG_INFO(&Poco::Logger::get("DNSCacheUpdater"), "IPs of some hosts have been changed. Will reload cluster config."); LOG_INFO(&Poco::Logger::get("DNSCacheUpdater"), "IPs of some hosts have been changed. Will reload cluster config.");
try try

View File

@ -10,7 +10,7 @@ namespace DB
class DNSCacheUpdater : WithContext class DNSCacheUpdater : WithContext
{ {
public: public:
DNSCacheUpdater(ContextPtr context, Int32 update_period_seconds_); DNSCacheUpdater(ContextPtr context, Int32 update_period_seconds_, UInt32 max_consecutive_failures);
~DNSCacheUpdater(); ~DNSCacheUpdater();
void start(); void start();
@ -18,6 +18,7 @@ private:
void run(); void run();
Int32 update_period_seconds; Int32 update_period_seconds;
UInt32 max_consecutive_failures;
BackgroundSchedulePool & pool; BackgroundSchedulePool & pool;
BackgroundSchedulePoolTaskHolder task_handle; BackgroundSchedulePoolTaskHolder task_handle;

View File

@ -1,7 +1,8 @@
[ [
"test_host_ip_change/test.py::test_dns_cache_update", "test_dns_cache/test.py::test_dns_cache_update",
"test_host_ip_change/test.py::test_ip_change_drop_dns_cache", "test_dns_cache/test.py::test_ip_change_drop_dns_cache",
"test_host_ip_change/test.py::test_ip_change_update_dns_cache", "test_dns_cache/test.py::test_ip_change_update_dns_cache",
"test_host_ip_change/test.py::test_user_access_ip_change[node0]", "test_dns_cache/test.py::test_user_access_ip_change[node0]",
"test_host_ip_change/test.py::test_user_access_ip_change[node1]" "test_dns_cache/test.py::test_user_access_ip_change[node1]",
"test_dns_cache/test.py::test_host_is_drop_from_cache_after_consecutive_failures"
] ]

View File

@ -1,3 +1,4 @@
<clickhouse> <clickhouse>
<dns_cache_update_period>1</dns_cache_update_period> <dns_cache_update_period>1</dns_cache_update_period>
<dns_max_consecutive_failures>6</dns_max_consecutive_failures>
</clickhouse> </clickhouse>

View File

@ -285,3 +285,24 @@ def test_user_access_ip_change(cluster_with_dns_cache_update, node):
retry_count=retry_count, retry_count=retry_count,
sleep_time=1, sleep_time=1,
) )
def test_host_is_drop_from_cache_after_consecutive_failures(
cluster_with_dns_cache_update,
):
with pytest.raises(QueryRuntimeException):
node4.query(
"SELECT * FROM remote('InvalidHostThatDoesNotExist', 'system', 'one')"
)
# Note that the list of hosts in variable since lost_host will be there too (and it's dropped and added back)
# dns_update_short -> dns_max_consecutive_failures set to 6
assert node4.wait_for_log_line(
"Cannot resolve host \\(InvalidHostThatDoesNotExist\\), error 0: Host not found."
)
assert node4.wait_for_log_line(
"Cached hosts not found:.*InvalidHostThatDoesNotExist**", repetitions=6
)
assert node4.wait_for_log_line(
"Cached hosts dropped:.*InvalidHostThatDoesNotExist.*"
)

View File

@ -0,0 +1,2 @@
first_check 1
second_check 1

View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
# Tags: no-parallel
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
current_dns_errors=$($CLICKHOUSE_CLIENT --query "SELECT sum(value) FROM system.events where event = 'DNSError';")
${CLICKHOUSE_CLIENT} --query "SELECT * FROM remote('ThisHostNameDoesNotExistSoItShouldFail', system, one)" 2>/dev/null
${CLICKHOUSE_CLIENT} --query "SELECT 'first_check', sum(value) > ${current_dns_errors} FROM system.events where event = 'DNSError';"
current_dns_errors=$($CLICKHOUSE_CLIENT --query "SELECT sum(value) FROM system.events where event = 'DNSError';")
${CLICKHOUSE_CLIENT} --query "SELECT * FROM remote('ThisHostNameDoesNotExistSoItShouldFail2', system, one)" 2>/dev/null
${CLICKHOUSE_CLIENT} --query "SELECT 'second_check', sum(value) > ${current_dns_errors} FROM system.events where event = 'DNSError';"
${CLICKHOUSE_CLIENT} --query "SYSTEM DROP DNS CACHE"