add unit tests, add new counter AddressesBanned

This commit is contained in:
Sema Checherinda 2024-05-13 20:06:58 +02:00
parent 58c53fa50b
commit d8856b06d8
4 changed files with 185 additions and 22 deletions

View File

@ -285,6 +285,7 @@
M(HTTPConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for http hosts") \ M(HTTPConnectionsTotal, "Total count of all sessions: stored in the pool and actively used right now for http hosts") \
\ \
M(AddressesActive, "Total count of addresses which are used for creation connections with connection pools") \ M(AddressesActive, "Total count of addresses which are used for creation connections with connection pools") \
M(AddressesBanned, "Total count of addresses which are banned as faulty for creation connections with connection pools") \
#ifdef APPLY_FOR_EXTERNAL_METRICS #ifdef APPLY_FOR_EXTERNAL_METRICS

View File

@ -8,6 +8,8 @@
#include <Common/MemoryTrackerSwitcher.h> #include <Common/MemoryTrackerSwitcher.h>
#include <mutex> #include <mutex>
#include <algorithm>
namespace ProfileEvents namespace ProfileEvents
{ {
@ -19,6 +21,7 @@ namespace ProfileEvents
namespace CurrentMetrics namespace CurrentMetrics
{ {
extern const Metric AddressesActive; extern const Metric AddressesActive;
extern const Metric AddressesBanned;
} }
namespace DB namespace DB
@ -36,6 +39,7 @@ HostResolverMetrics HostResolver::getMetrics()
.expired = ProfileEvents::AddressesExpired, .expired = ProfileEvents::AddressesExpired,
.failed = ProfileEvents::AddressesMarkedAsFailed, .failed = ProfileEvents::AddressesMarkedAsFailed,
.active_count = CurrentMetrics::AddressesActive, .active_count = CurrentMetrics::AddressesActive,
.banned_count = CurrentMetrics::AddressesBanned,
}; };
} }
@ -47,7 +51,7 @@ HostResolver::WeakPtr HostResolver::getWeakFromThis()
HostResolver::HostResolver(String host_, Poco::Timespan history_) HostResolver::HostResolver(String host_, Poco::Timespan history_)
: host(std::move(host_)) : host(std::move(host_))
, history(history_) , history(history_)
, resolve_function([](const String & host_to_resolve) { return DNSResolver::instance().resolveHostAll(host_to_resolve); }) , resolve_function([](const String & host_to_resolve) { return DNSResolver::instance().resolveHostAllInOriginOrder(host_to_resolve); })
{ {
update(); update();
} }
@ -62,6 +66,12 @@ HostResolver::HostResolver(
HostResolver::~HostResolver() HostResolver::~HostResolver()
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
auto banned_count = 0;
for (const auto & rec: records)
banned_count += rec.failed;
CurrentMetrics::sub(metrics.banned_count, banned_count);
CurrentMetrics::sub(metrics.active_count, records.size()); CurrentMetrics::sub(metrics.active_count, records.size());
records.clear(); records.clear();
} }
@ -113,6 +123,7 @@ void HostResolver::updateWeights()
if (getTotalWeight() == 0 && !records.empty()) if (getTotalWeight() == 0 && !records.empty())
{ {
CurrentMetrics::sub(metrics.banned_count, records.size());
for (auto & rec : records) for (auto & rec : records)
rec.failed = false; rec.failed = false;
@ -158,7 +169,8 @@ void HostResolver::setFail(const Poco::Net::IPAddress & address)
if (it == records.end()) if (it == records.end())
return; return;
it->setFail(now); if (it->setFail(now))
CurrentMetrics::add(metrics.banned_count);
} }
ProfileEvents::increment(metrics.failed); ProfileEvents::increment(metrics.failed);
@ -215,17 +227,20 @@ void HostResolver::updateImpl(Poco::Timestamp now, std::vector<Poco::Net::IPAddr
{ {
CurrentMetrics::sub(metrics.active_count, 1); CurrentMetrics::sub(metrics.active_count, 1);
ProfileEvents::increment(metrics.expired, 1); ProfileEvents::increment(metrics.expired, 1);
if (it_before->failed)
CurrentMetrics::sub(metrics.banned_count);
} }
++it_before; ++it_before;
} }
else if (it_before == records.end() || (it_next != next_gen.end() && *it_next < it_before->address)) else if (it_before == records.end() || (it_next != next_gen.end() && *it_next < it_before->address))
{ {
CurrentMetrics::add(metrics.active_count, 1); /// there are could be duplicates in next_gen vector
ProfileEvents::increment(metrics.discovered, 1);
if (merged.empty() || merged.back().address != *it_next) if (merged.empty() || merged.back().address != *it_next)
{
CurrentMetrics::add(metrics.active_count, 1);
ProfileEvents::increment(metrics.discovered, 1);
merged.push_back(Record(*it_next, now)); merged.push_back(Record(*it_next, now));
else }
merged.back().resolve_time = now;
++it_next; ++it_next;
} }
else else
@ -239,9 +254,22 @@ void HostResolver::updateImpl(Poco::Timestamp now, std::vector<Poco::Net::IPAddr
} }
for (auto & rec : merged) for (auto & rec : merged)
rec.cleanTimeoutedFailedFlag(now, history); {
if (!rec.failed)
continue;
/// Exponential increased time for each consecutive fail
auto banned_until = now - Poco::Timespan(history.totalMicroseconds() * (1ull << (rec.consecutive_fail_count - 1)));
if (rec.fail_time < banned_until)
{
rec.failed = false;
CurrentMetrics::sub(metrics.banned_count);
}
}
chassert(std::is_sorted(merged.begin(), merged.end())); chassert(std::is_sorted(merged.begin(), merged.end()));
// check that merged contains unuque elements
chassert(std::adjacent_find(merged.begin(), merged.end()) == merged.end());
last_resolve_time = now; last_resolve_time = now;
records.swap(merged); records.swap(merged);

View File

@ -39,6 +39,7 @@ struct HostResolverMetrics
const ProfileEvents::Event failed = ProfileEvents::end(); const ProfileEvents::Event failed = ProfileEvents::end();
const CurrentMetrics::Metric active_count = CurrentMetrics::end(); const CurrentMetrics::Metric active_count = CurrentMetrics::end();
const CurrentMetrics::Metric banned_count = CurrentMetrics::end();
}; };
constexpr size_t DEFAULT_RESOLVE_TIME_HISTORY_SECONDS = 2*60; constexpr size_t DEFAULT_RESOLVE_TIME_HISTORY_SECONDS = 2*60;
@ -151,6 +152,11 @@ protected:
return address < r.address; return address < r.address;
} }
bool operator ==(const Record & r) const
{
return address == r.address;
}
size_t getWeight() const size_t getWeight() const
{ {
if (failed) if (failed)
@ -169,21 +175,20 @@ protected:
return 10; return 10;
} }
void cleanTimeoutedFailedFlag(const Poco::Timestamp & now, const Poco::Timespan & keep_history) bool setFail(const Poco::Timestamp & now)
{ {
if (!failed) bool was_ok = !failed;
return;
/// Exponential increased time between flag cleanups
if (fail_time < now - Poco::Timespan(keep_history.totalSeconds() * (1ull << (consecutive_fail_count - 1)), 0))
failed = false;
}
void setFail(const Poco::Timestamp & now)
{
failed = true; failed = true;
fail_time = now; fail_time = now;
if (consecutive_fail_count < RECORD_CONSECTIVE_FAIL_COUNT_LIMIT)
++consecutive_fail_count; if (was_ok)
{
if (consecutive_fail_count < RECORD_CONSECTIVE_FAIL_COUNT_LIMIT)
++consecutive_fail_count;
}
return was_ok;
} }
void setSuccess() void setSuccess()
@ -214,7 +219,7 @@ protected:
std::mutex mutex; std::mutex mutex;
Poco::Timestamp last_resolve_time TSA_GUARDED_BY(mutex); Poco::Timestamp last_resolve_time TSA_GUARDED_BY(mutex) = Poco::Timestamp::TIMEVAL_MIN;
Records records TSA_GUARDED_BY(mutex); Records records TSA_GUARDED_BY(mutex);
Poco::Logger * log = &Poco::Logger::get("ConnectionPool"); Poco::Logger * log = &Poco::Logger::get("ConnectionPool");

View File

@ -2,7 +2,9 @@
#include <base/sleep.h> #include <base/sleep.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Common/HostResolvePool.h> #include <Common/HostResolvePool.h>
#include "base/defines.h"
#include <optional>
#include <thread> #include <thread>
#include <gtest/gtest.h> #include <gtest/gtest.h>
@ -29,8 +31,9 @@ protected:
DB::CurrentThread::getProfileEvents().reset(); DB::CurrentThread::getProfileEvents().reset();
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count)); ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count));
addresses = std::set<String>{"127.0.0.1", "127.0.0.2", "127.0.0.3"}; addresses = std::multiset<String>{"127.0.0.1", "127.0.0.2", "127.0.0.3"};
// Code here will be called immediately after the constructor (right // Code here will be called immediately after the constructor (right
// before each test). // before each test).
} }
@ -58,7 +61,7 @@ protected:
} }
DB::HostResolverMetrics metrics = DB::HostResolver::getMetrics(); DB::HostResolverMetrics metrics = DB::HostResolver::getMetrics();
std::set<String> addresses; std::multiset<String> addresses;
}; };
TEST_F(ResolvePoolTest, CanResolve) TEST_F(ResolvePoolTest, CanResolve)
@ -160,7 +163,7 @@ TEST_F(ResolvePoolTest, CanMerge)
ASSERT_EQ(addresses.size(), DB::CurrentThread::getProfileEvents()[metrics.discovered]); ASSERT_EQ(addresses.size(), DB::CurrentThread::getProfileEvents()[metrics.discovered]);
auto old_addresses = addresses; auto old_addresses = addresses;
addresses = std::set<String>{"127.0.0.4", "127.0.0.5"}; addresses = std::multiset<String>{"127.0.0.4", "127.0.0.5"};
resolver->update(); resolver->update();
@ -229,6 +232,7 @@ TEST_F(ResolvePoolTest, CanFail)
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.failed]); ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.failed]);
ASSERT_EQ(addresses.size(), CurrentMetrics::get(metrics.active_count)); ASSERT_EQ(addresses.size(), CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
ASSERT_EQ(addresses.size(), DB::CurrentThread::getProfileEvents()[metrics.discovered]); ASSERT_EQ(addresses.size(), DB::CurrentThread::getProfileEvents()[metrics.discovered]);
for (size_t i = 0; i < 1000; ++i) for (size_t i = 0; i < 1000; ++i)
@ -243,15 +247,20 @@ TEST_F(ResolvePoolTest, CanFail)
TEST_F(ResolvePoolTest, CanFailAndHeal) TEST_F(ResolvePoolTest, CanFailAndHeal)
{ {
auto resolver = make_resolver(); auto resolver = make_resolver();
ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count));
auto failed_addr = resolver->resolve(); auto failed_addr = resolver->resolve();
failed_addr.setFail(); failed_addr.setFail();
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
while (true) while (true)
{ {
auto next_addr = resolver->resolve(); auto next_addr = resolver->resolve();
if (*failed_addr == *next_addr) if (*failed_addr == *next_addr)
{
ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count));
break; break;
}
} }
} }
@ -277,3 +286,123 @@ TEST_F(ResolvePoolTest, CanExpire)
ASSERT_EQ(addresses.size() + 1, DB::CurrentThread::getProfileEvents()[metrics.discovered]); ASSERT_EQ(addresses.size() + 1, DB::CurrentThread::getProfileEvents()[metrics.discovered]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.expired]); ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.expired]);
} }
TEST_F(ResolvePoolTest, DuplicatesInAddresses)
{
auto resolver = make_resolver();
size_t unuque_addresses = addresses.size();
ASSERT_EQ(3, unuque_addresses);
ASSERT_EQ(3, DB::CurrentThread::getProfileEvents()[metrics.discovered]);
ASSERT_TRUE(!addresses.empty());
addresses.insert(*addresses.begin());
addresses.insert(*addresses.begin());
size_t total_addresses = addresses.size();
ASSERT_EQ(addresses.count(*addresses.begin()), 3);
ASSERT_EQ(unuque_addresses + 2, total_addresses);
resolver->update();
ASSERT_EQ(3, DB::CurrentThread::getProfileEvents()[metrics.discovered]);
}
void check_no_failed_address(size_t iteration, auto & resolver, auto & addresses, auto & failed_addr, auto & metrics)
{
ASSERT_EQ(iteration, DB::CurrentThread::getProfileEvents()[metrics.failed]);
for (size_t i = 0; i < 100; ++i)
{
auto next_addr = resolver->resolve();
ASSERT_TRUE(addresses.contains(*next_addr));
ASSERT_NE(*next_addr, *failed_addr);
}
}
TEST_F(ResolvePoolTest, BannedForConsiquenceFail)
{
size_t history_ms = 5;
auto resolver = make_resolver(history_ms);
auto failed_addr = resolver->resolve();
ASSERT_TRUE(addresses.contains(*failed_addr));
failed_addr.setFail();
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
check_no_failed_address(1, resolver, addresses, failed_addr, metrics);
sleepForMilliseconds(history_ms + 1);
resolver->update();
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count));
failed_addr.setFail();
check_no_failed_address(2, resolver, addresses, failed_addr, metrics);
sleepForMilliseconds(history_ms + 1);
resolver->update();
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
// ip still banned adter history_ms + update, because it was his second consiquent fail
check_no_failed_address(2, resolver, addresses, failed_addr, metrics);
}
TEST_F(ResolvePoolTest, NoAditionalBannForConcurrentFail)
{
size_t history_ms = 5;
auto resolver = make_resolver(history_ms);
auto failed_addr = resolver->resolve();
ASSERT_TRUE(addresses.contains(*failed_addr));
failed_addr.setFail();
failed_addr.setFail();
failed_addr.setFail();
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
check_no_failed_address(3, resolver, addresses, failed_addr, metrics);
sleepForMilliseconds(history_ms + 1);
resolver->update();
// ip is cleared after just 1 history_ms interval.
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.banned_count));
}
TEST_F(ResolvePoolTest, StillBannedAfterSuccess)
{
size_t history_ms = 5;
auto resolver = make_resolver(history_ms);
auto failed_addr = resolver->resolve();
ASSERT_TRUE(addresses.contains(*failed_addr));
std::optional<decltype(resolver->resolve())> again_addr;
while (true)
{
auto addr = resolver->resolve();
if (*addr == *failed_addr)
{
again_addr.emplace(std::move(addr));
break;
}
}
chassert(again_addr);
failed_addr.setFail();
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
check_no_failed_address(1, resolver, addresses, failed_addr, metrics);
again_addr = std::nullopt; // success;
ASSERT_EQ(3, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.banned_count));
}