work with review notes

This commit is contained in:
Sema Checherinda 2024-03-07 13:56:51 +01:00
parent f7f1d86e66
commit 4df406d3ad
6 changed files with 45 additions and 14 deletions

View File

@ -212,8 +212,9 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
/// Remove this possible stale entry from cache
DNSResolver::instance().removeHostFromCache(host);
/// Add server address to exception. Also Exception will remember stack trace. It's a pity that more precise exception type is lost.
throw NetException(ErrorCodes::NETWORK_ERROR, "{} ({})", e.displayText(), getDescription());
/// Add server address to exception. Exception will preserve stack trace.
e.addMessage("({})", getDescription());
throw;
}
catch (Poco::Net::NetException & e)
{
@ -222,7 +223,7 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
/// Remove this possible stale entry from cache
DNSResolver::instance().removeHostFromCache(host);
/// Add server address to exception. Also Exception will remember stack trace. It's a pity that more precise exception type is lost.
/// Add server address to exception. Also Exception will remember new stack trace. It's a pity that more precise exception type is lost.
throw NetException(ErrorCodes::NETWORK_ERROR, "{} ({})", e.displayText(), getDescription());
}
catch (Poco::TimeoutException & e)
@ -232,7 +233,7 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
/// Remove this possible stale entry from cache
DNSResolver::instance().removeHostFromCache(host);
/// Add server address to exception. Also Exception will remember stack trace. It's a pity that more precise exception type is lost.
/// Add server address to exception. Also Exception will remember new stack trace. It's a pity that more precise exception type is lost.
/// This exception can only be thrown from socket->connect(), so add information about connection timeout.
const auto & connection_timeout = static_cast<bool>(secure) ? timeouts.secure_connection_timeout : timeouts.connection_timeout;
throw NetException(

View File

@ -336,7 +336,7 @@ private:
request_stream_completed = false;
response_stream = nullptr;
response_stream_completed = true;
response_stream_completed = false;
return result;
}

View File

@ -13,7 +13,7 @@ namespace ProfileEvents
{
extern const Event AddressesDiscovered;
extern const Event AddressesExpired;
extern const Event AddressesFailScored;
extern const Event AddressesMarkedAsFailed;
}
namespace CurrentMetrics
@ -34,7 +34,7 @@ HostResolverMetrics HostResolver::getMetrics()
return HostResolverMetrics{
.discovered = ProfileEvents::AddressesDiscovered,
.expired = ProfileEvents::AddressesExpired,
.failed = ProfileEvents::AddressesFailScored,
.failed = ProfileEvents::AddressesMarkedAsFailed,
.active_count = CurrentMetrics::AddressesActive,
};
}
@ -120,7 +120,6 @@ void HostResolver::updateWeights()
}
chassert((getTotalWeight() > 0 && !records.empty()) || records.empty());
random_weight_picker = std::uniform_int_distribution<size_t>(0, getTotalWeight() - 1);
}
HostResolver::Entry HostResolver::resolve()
@ -170,6 +169,7 @@ void HostResolver::setFail(const Poco::Net::IPAddress & address)
Poco::Net::IPAddress HostResolver::selectBest()
{
chassert(!records.empty());
auto random_weight_picker = std::uniform_int_distribution<size_t>(0, getTotalWeight() - 1);
size_t weight = random_weight_picker(thread_local_rng);
auto it = std::partition_point(records.begin(), records.end(), [&](const Record & rec) { return rec.weight_prefix_sum <= weight; });
chassert(it != records.end());
@ -178,8 +178,13 @@ Poco::Net::IPAddress HostResolver::selectBest()
HostResolver::Records::iterator HostResolver::find(const Poco::Net::IPAddress & addr) TSA_REQUIRES(mutex)
{
return std::lower_bound(
auto it = std::lower_bound(
records.begin(), records.end(), addr, [](const Record & rec, const Poco::Net::IPAddress & value) { return rec.address < value; });
if (it != records.end() && it->address != addr)
return records.end();
return it;
}
bool HostResolver::isUpdateNeeded()

View File

@ -191,8 +191,6 @@ protected:
Poco::Timestamp last_resolve_time TSA_GUARDED_BY(mutex);
Records records TSA_GUARDED_BY(mutex);
std::uniform_int_distribution<size_t> random_weight_picker TSA_GUARDED_BY(mutex);
Poco::Logger * log = &Poco::Logger::get("ConnectionPool");
};

View File

@ -721,9 +721,9 @@ The server successfully detected this situation and will download merged part fr
M(HTTPConnectionsErrors, "Number of cases when creation of a http connection failed") \
M(HTTPConnectionsElapsedMicroseconds, "Total time spend on creating http connections") \
\
M(AddressesDiscovered, "Total count of new addresses in dns resolve results for connection pools") \
M(AddressesExpired, "Total count of expired addresses which is no longer presented in dns resolve results for for connection pools") \
M(AddressesFailScored, "Total count of new addresses in dns resolve results for for connection pools") \
M(AddressesDiscovered, "Total count of new addresses in dns resolve results for http connections") \
M(AddressesExpired, "Total count of expired addresses which is no longer presented in dns resolve results for http connections") \
M(AddressesMarkedAsFailed, "Total count of addresses which has been marked as faulty due to connection errors for http connections") \
#ifdef APPLY_FOR_EXTERNAL_EVENTS

View File

@ -552,6 +552,33 @@ TEST_F(ConnectionPoolTest, HardLimit)
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().stored_count));
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);
}
TEST_F(ConnectionPoolTest, NoReceiveCall)
{
auto pool = getPool();
{
auto connection = pool->getConnection(timeouts);
{
auto data = String("Hello");
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_PUT, "/", "HTTP/1.1"); // HTTP/1.1 is required for keep alive
request.setContentLength(data.size());
std::ostream & ostream = connection->sendRequest(request);
ostream << data;
}
connection->flushRequest();
}
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().stored_count));
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);