From d8856b06d873797117ca260d009d55a7632bcb58 Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Mon, 13 May 2024 20:06:58 +0200 Subject: [PATCH] add unit tests, add new counter AddressesBanned --- src/Common/CurrentMetrics.cpp | 1 + src/Common/HostResolvePool.cpp | 42 ++++++-- src/Common/HostResolvePool.h | 29 ++--- src/Common/tests/gtest_resolve_pool.cpp | 135 +++++++++++++++++++++++- 4 files changed, 185 insertions(+), 22 deletions(-) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 0f25397a961..01dd8271459 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -285,6 +285,7 @@ M(HTTPConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for http hosts") \ \ M(AddressesActive, "Total count of addresses which are used for creation connections with connection pools") \ + M(AddressesBanned, "Total count of addresses which are banned as faulty for creation connections with connection pools") \ #ifdef APPLY_FOR_EXTERNAL_METRICS diff --git a/src/Common/HostResolvePool.cpp b/src/Common/HostResolvePool.cpp index 1b783bc596f..cad64ee7204 100644 --- a/src/Common/HostResolvePool.cpp +++ b/src/Common/HostResolvePool.cpp @@ -8,6 +8,8 @@ #include #include +#include + namespace ProfileEvents { @@ -19,6 +21,7 @@ namespace ProfileEvents namespace CurrentMetrics { extern const Metric AddressesActive; + extern const Metric AddressesBanned; } namespace DB @@ -36,6 +39,7 @@ HostResolverMetrics HostResolver::getMetrics() .expired = ProfileEvents::AddressesExpired, .failed = ProfileEvents::AddressesMarkedAsFailed, .active_count = CurrentMetrics::AddressesActive, + .banned_count = CurrentMetrics::AddressesBanned, }; } @@ -47,7 +51,7 @@ HostResolver::WeakPtr HostResolver::getWeakFromThis() HostResolver::HostResolver(String host_, Poco::Timespan history_) : host(std::move(host_)) , history(history_) - , resolve_function([](const String & host_to_resolve) { return DNSResolver::instance().resolveHostAll(host_to_resolve); }) + , resolve_function([](const String & host_to_resolve) { return DNSResolver::instance().resolveHostAllInOriginOrder(host_to_resolve); }) { update(); } @@ -62,6 +66,12 @@ HostResolver::HostResolver( HostResolver::~HostResolver() { std::lock_guard lock(mutex); + + auto banned_count = 0; + for (const auto & rec: records) + banned_count += rec.failed; + CurrentMetrics::sub(metrics.banned_count, banned_count); + CurrentMetrics::sub(metrics.active_count, records.size()); records.clear(); } @@ -113,6 +123,7 @@ void HostResolver::updateWeights() if (getTotalWeight() == 0 && !records.empty()) { + CurrentMetrics::sub(metrics.banned_count, records.size()); for (auto & rec : records) rec.failed = false; @@ -158,7 +169,8 @@ void HostResolver::setFail(const Poco::Net::IPAddress & address) if (it == records.end()) return; - it->setFail(now); + if (it->setFail(now)) + CurrentMetrics::add(metrics.banned_count); } ProfileEvents::increment(metrics.failed); @@ -215,17 +227,20 @@ void HostResolver::updateImpl(Poco::Timestamp now, std::vectorfailed) + CurrentMetrics::sub(metrics.banned_count); } ++it_before; } else if (it_before == records.end() || (it_next != next_gen.end() && *it_next < it_before->address)) { - CurrentMetrics::add(metrics.active_count, 1); - ProfileEvents::increment(metrics.discovered, 1); + /// there are could be duplicates in next_gen vector if (merged.empty() || merged.back().address != *it_next) + { + CurrentMetrics::add(metrics.active_count, 1); + ProfileEvents::increment(metrics.discovered, 1); merged.push_back(Record(*it_next, now)); - else - merged.back().resolve_time = now; + } ++it_next; } else @@ -239,9 +254,22 @@ void HostResolver::updateImpl(Poco::Timestamp now, std::vector #include #include +#include "base/defines.h" +#include #include #include @@ -29,8 +31,9 @@ protected: DB::CurrentThread::getProfileEvents().reset(); ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count)); + ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count)); - addresses = std::set{"127.0.0.1", "127.0.0.2", "127.0.0.3"}; + addresses = std::multiset{"127.0.0.1", "127.0.0.2", "127.0.0.3"}; // Code here will be called immediately after the constructor (right // before each test). } @@ -58,7 +61,7 @@ protected: } DB::HostResolverMetrics metrics = DB::HostResolver::getMetrics(); - std::set addresses; + std::multiset addresses; }; TEST_F(ResolvePoolTest, CanResolve) @@ -160,7 +163,7 @@ TEST_F(ResolvePoolTest, CanMerge) ASSERT_EQ(addresses.size(), DB::CurrentThread::getProfileEvents()[metrics.discovered]); auto old_addresses = addresses; - addresses = std::set{"127.0.0.4", "127.0.0.5"}; + addresses = std::multiset{"127.0.0.4", "127.0.0.5"}; resolver->update(); @@ -229,6 +232,7 @@ TEST_F(ResolvePoolTest, CanFail) ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.failed]); ASSERT_EQ(addresses.size(), CurrentMetrics::get(metrics.active_count)); + ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count)); ASSERT_EQ(addresses.size(), DB::CurrentThread::getProfileEvents()[metrics.discovered]); for (size_t i = 0; i < 1000; ++i) @@ -243,15 +247,20 @@ TEST_F(ResolvePoolTest, CanFail) TEST_F(ResolvePoolTest, CanFailAndHeal) { auto resolver = make_resolver(); + ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count)); auto failed_addr = resolver->resolve(); failed_addr.setFail(); + ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count)); while (true) { auto next_addr = resolver->resolve(); if (*failed_addr == *next_addr) + { + ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count)); break; + } } } @@ -277,3 +286,123 @@ TEST_F(ResolvePoolTest, CanExpire) ASSERT_EQ(addresses.size() + 1, DB::CurrentThread::getProfileEvents()[metrics.discovered]); ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.expired]); } + + +TEST_F(ResolvePoolTest, DuplicatesInAddresses) +{ + auto resolver = make_resolver(); + + size_t unuque_addresses = addresses.size(); + + ASSERT_EQ(3, unuque_addresses); + ASSERT_EQ(3, DB::CurrentThread::getProfileEvents()[metrics.discovered]); + + ASSERT_TRUE(!addresses.empty()); + addresses.insert(*addresses.begin()); + addresses.insert(*addresses.begin()); + + size_t total_addresses = addresses.size(); + + ASSERT_EQ(addresses.count(*addresses.begin()), 3); + ASSERT_EQ(unuque_addresses + 2, total_addresses); + + resolver->update(); + ASSERT_EQ(3, DB::CurrentThread::getProfileEvents()[metrics.discovered]); +} + +void check_no_failed_address(size_t iteration, auto & resolver, auto & addresses, auto & failed_addr, auto & metrics) +{ + ASSERT_EQ(iteration, DB::CurrentThread::getProfileEvents()[metrics.failed]); + for (size_t i = 0; i < 100; ++i) + { + auto next_addr = resolver->resolve(); + ASSERT_TRUE(addresses.contains(*next_addr)); + ASSERT_NE(*next_addr, *failed_addr); + } +} + +TEST_F(ResolvePoolTest, BannedForConsiquenceFail) +{ + size_t history_ms = 5; + auto resolver = make_resolver(history_ms); + + + auto failed_addr = resolver->resolve(); + ASSERT_TRUE(addresses.contains(*failed_addr)); + + failed_addr.setFail(); + ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count)); + ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count)); + check_no_failed_address(1, resolver, addresses, failed_addr, metrics); + + sleepForMilliseconds(history_ms + 1); + resolver->update(); + ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count)); + ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count)); + + failed_addr.setFail(); + check_no_failed_address(2, resolver, addresses, failed_addr, metrics); + + sleepForMilliseconds(history_ms + 1); + resolver->update(); + ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count)); + ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count)); + + // ip still banned adter history_ms + update, because it was his second consiquent fail + check_no_failed_address(2, resolver, addresses, failed_addr, metrics); +} + +TEST_F(ResolvePoolTest, NoAditionalBannForConcurrentFail) +{ + size_t history_ms = 5; + auto resolver = make_resolver(history_ms); + + auto failed_addr = resolver->resolve(); + ASSERT_TRUE(addresses.contains(*failed_addr)); + + failed_addr.setFail(); + failed_addr.setFail(); + failed_addr.setFail(); + + ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count)); + ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count)); + check_no_failed_address(3, resolver, addresses, failed_addr, metrics); + + sleepForMilliseconds(history_ms + 1); + resolver->update(); + // ip is cleared after just 1 history_ms interval. + ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count)); + ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count)); +} + +TEST_F(ResolvePoolTest, StillBannedAfterSuccess) +{ + size_t history_ms = 5; + auto resolver = make_resolver(history_ms); + + auto failed_addr = resolver->resolve(); + ASSERT_TRUE(addresses.contains(*failed_addr)); + + std::optionalresolve())> again_addr; + while (true) + { + auto addr = resolver->resolve(); + if (*addr == *failed_addr) + { + again_addr.emplace(std::move(addr)); + break; + } + } + chassert(again_addr); + + failed_addr.setFail(); + + ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count)); + ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count)); + check_no_failed_address(1, resolver, addresses, failed_addr, metrics); + + again_addr = std::nullopt; // success; + + ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count)); + ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count)); +}