add logs and metrics about rejected connections in Poco

This commit is contained in:
Alexander Tokmakov 2024-07-12 01:01:52 +02:00
parent b0580eb575
commit 243edcc8aa
12 changed files with 367 additions and 261 deletions

View File

@ -21,6 +21,7 @@
#include "Poco/Exception.h" #include "Poco/Exception.h"
#include "Poco/Foundation.h" #include "Poco/Foundation.h"
#include "Poco/Mutex.h" #include "Poco/Mutex.h"
#include "Poco/Message.h"
namespace Poco namespace Poco
@ -78,6 +79,10 @@ public:
/// ///
/// The default implementation just breaks into the debugger. /// The default implementation just breaks into the debugger.
virtual void logMessageImpl(Message::Priority priority, const std::string & msg) {}
/// Write a messages to the log
/// Useful for logging from Poco
static void handle(const Exception & exc); static void handle(const Exception & exc);
/// Invokes the currently registered ErrorHandler. /// Invokes the currently registered ErrorHandler.
@ -87,6 +92,9 @@ public:
static void handle(); static void handle();
/// Invokes the currently registered ErrorHandler. /// Invokes the currently registered ErrorHandler.
static void logMessage(Message::Priority priority, const std::string & msg);
/// Invokes the currently registered ErrorHandler.
static ErrorHandler * set(ErrorHandler * pHandler); static ErrorHandler * set(ErrorHandler * pHandler);
/// Registers the given handler as the current error handler. /// Registers the given handler as the current error handler.
/// ///

View File

@ -89,6 +89,18 @@ void ErrorHandler::handle()
} }
} }
void ErrorHandler::logMessage(Message::Priority priority, const std::string & msg)
{
FastMutex::ScopedLock lock(_mutex);
try
{
_pHandler->logMessageImpl(priority, msg);
}
catch (...)
{
}
}
ErrorHandler* ErrorHandler::set(ErrorHandler* pHandler) ErrorHandler* ErrorHandler::set(ErrorHandler* pHandler)
{ {

View File

@ -17,6 +17,7 @@
#include "Poco/Net/StreamSocketImpl.h" #include "Poco/Net/StreamSocketImpl.h"
#include "Poco/NumberFormatter.h" #include "Poco/NumberFormatter.h"
#include "Poco/Timestamp.h" #include "Poco/Timestamp.h"
#include "Poco/ErrorHandler.h"
#include <string.h> // FD_SET needs memset on some platforms, so we can't use <cstring> #include <string.h> // FD_SET needs memset on some platforms, so we can't use <cstring>

View File

@ -147,6 +147,10 @@ void TCPServer::run()
} }
_pDispatcher->enqueue(ss); _pDispatcher->enqueue(ss);
} }
else
{
ErrorHandler::logMessage(Message::PRIO_WARNING, "Filtered out connection from " + ss.peerAddress().toString());
}
} }
catch (Poco::Exception& exc) catch (Poco::Exception& exc)
{ {

View File

@ -143,8 +143,23 @@ void TCPServerDispatcher::enqueue(const StreamSocket& socket)
{ {
FastMutex::ScopedLock lock(_mutex); FastMutex::ScopedLock lock(_mutex);
ErrorHandler::logMessage(Message::PRIO_TEST, "Queue size: " + std::to_string(_queue.size()) +
", current threads: " + std::to_string(_currentThreads) +
", threads in pool: " + std::to_string(_threadPool.allocated()) +
", current connections: " + std::to_string(_currentConnections));
if (_queue.size() < _pParams->getMaxQueued()) if (_queue.size() < _pParams->getMaxQueued())
{ {
/// NOTE: the condition below is wrong.
/// Since the thread pool is shared between multiple servers/TCPServerDispatchers,
/// _currentThreads < _pParams->getMaxThreads() will be true when the pool is actually saturated.
/// As a result, queue is useless and connections never wait in queue.
/// Instead, we (mistakenly) think that we can create a thread for this connection, but we fail to create it
/// and the connection get rejected.
/// We could check _currentThreads < _threadPool.allocated() to make it work,
/// but it's not clear if we want to make it work
/// because it may be better to reject connection immediately if we don't have resources to handle it.
if (!_queue.hasIdleThreads() && _currentThreads < _pParams->getMaxThreads()) if (!_queue.hasIdleThreads() && _currentThreads < _pParams->getMaxThreads())
{ {
try try
@ -155,17 +170,25 @@ void TCPServerDispatcher::enqueue(const StreamSocket& socket)
} }
catch (Poco::Exception& exc) catch (Poco::Exception& exc)
{ {
ErrorHandler::logMessage(Message::PRIO_WARNING, "Got an exception while starting thread for connection from " +
socket.peerAddress().toString());
ErrorHandler::handle(exc);
this->release(); this->release();
++_refusedConnections; ++_refusedConnections;
std::cerr << "Got exception while starting thread for connection. Error code: "
<< exc.code() << ", message: '" << exc.displayText() << "'" << std::endl;
return; return;
} }
} }
else if (!_queue.hasIdleThreads())
{
ErrorHandler::logMessage(Message::PRIO_TRACE, "Don't have idle threads, adding connection from " +
socket.peerAddress().toString() + " to the queue, size: " + std::to_string(_queue.size()));
}
_queue.enqueueNotification(new TCPConnectionNotification(socket)); _queue.enqueueNotification(new TCPConnectionNotification(socket));
} }
else else
{ {
ErrorHandler::logMessage(Message::PRIO_WARNING, "Refusing connection from " + socket.peerAddress().toString() +
", reached max queue size " + std::to_string(_pParams->getMaxQueued()));
++_refusedConnections; ++_refusedConnections;
} }
} }

View File

@ -410,7 +410,7 @@ try
std::lock_guard lock(servers_lock); std::lock_guard lock(servers_lock);
metrics.reserve(servers->size()); metrics.reserve(servers->size());
for (const auto & server : *servers) for (const auto & server : *servers)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()}); metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads(), server.refusedConnections()});
return metrics; return metrics;
} }
); );

View File

@ -909,10 +909,10 @@ try
metrics.reserve(servers_to_start_before_tables.size() + servers.size()); metrics.reserve(servers_to_start_before_tables.size() + servers.size());
for (const auto & server : servers_to_start_before_tables) for (const auto & server : servers_to_start_before_tables)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()}); metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads(), server.refusedConnections()});
for (const auto & server : servers) for (const auto & server : servers)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()}); metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads(), server.refusedConnections()});
return metrics; return metrics;
} }
); );

View File

@ -1613,7 +1613,7 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
#endif #endif
{ {
auto get_metric_name_doc = [](const String & name) -> std::pair<const char *, const char *> auto threads_get_metric_name_doc = [](const String & name) -> std::pair<const char *, const char *>
{ {
static std::map<String, std::pair<const char *, const char *>> metric_map = static std::map<String, std::pair<const char *, const char *>> metric_map =
{ {
@ -1637,11 +1637,38 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
return it->second; return it->second;
}; };
auto rejected_connections_get_metric_name_doc = [](const String & name) -> std::pair<const char *, const char *>
{
static std::map<String, std::pair<const char *, const char *>> metric_map =
{
{"tcp_port", {"TCPRejectedConnections", "Number of rejected connections for the TCP protocol (without TLS)."}},
{"tcp_port_secure", {"TCPSecureRejectedConnections", "Number of rejected connections for the TCP protocol (with TLS)."}},
{"http_port", {"HTTPRejectedConnections", "Number of rejected connections for the HTTP interface (without TLS)."}},
{"https_port", {"HTTPSecureRejectedConnections", "Number of rejected connections for the HTTPS interface."}},
{"interserver_http_port", {"InterserverRejectedConnections", "Number of rejected connections for the replicas communication protocol (without TLS)."}},
{"interserver_https_port", {"InterserverSecureRejectedConnections", "Number of rejected connections for the replicas communication protocol (with TLS)."}},
{"mysql_port", {"MySQLRejectedConnections", "Number of rejected connections for the MySQL compatibility protocol."}},
{"postgresql_port", {"PostgreSQLRejectedConnections", "Number of rejected connections for the PostgreSQL compatibility protocol."}},
{"grpc_port", {"GRPCRejectedConnections", "Number of rejected connections for the GRPC protocol."}},
{"prometheus.port", {"PrometheusRejectedConnections", "Number of rejected connections for the Prometheus endpoint. Note: prometheus endpoints can be also used via the usual HTTP/HTTPs ports."}},
{"keeper_server.tcp_port", {"KeeperTCPRejectedConnections", "Number of rejected connections for the Keeper TCP protocol (without TLS)."}},
{"keeper_server.tcp_port_secure", {"KeeperTCPSecureRejectedConnections", "Number of rejected connections for the Keeper TCP protocol (with TLS)."}}
};
auto it = metric_map.find(name);
if (it == metric_map.end())
return { nullptr, nullptr };
else
return it->second;
};
const auto server_metrics = protocol_server_metrics_func(); const auto server_metrics = protocol_server_metrics_func();
for (const auto & server_metric : server_metrics) for (const auto & server_metric : server_metrics)
{ {
if (auto name_doc = get_metric_name_doc(server_metric.port_name); name_doc.first != nullptr) if (auto name_doc = threads_get_metric_name_doc(server_metric.port_name); name_doc.first != nullptr)
new_values[name_doc.first] = { server_metric.current_threads, name_doc.second }; new_values[name_doc.first] = { server_metric.current_threads, name_doc.second };
if (auto name_doc = rejected_connections_get_metric_name_doc(server_metric.port_name); name_doc.first != nullptr)
new_values[name_doc.first] = { server_metric.rejected_connections, name_doc.second };
} }
} }

View File

@ -42,6 +42,7 @@ struct ProtocolServerMetrics
{ {
String port_name; String port_name;
size_t current_threads; size_t current_threads;
size_t rejected_connections;
}; };
/** Periodically (by default, each second) /** Periodically (by default, each second)

View File

@ -2,6 +2,7 @@
#include <Poco/ErrorHandler.h> #include <Poco/ErrorHandler.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/logger_useful.h>
/** ErrorHandler for Poco::Thread, /** ErrorHandler for Poco::Thread,
@ -26,8 +27,32 @@ public:
void exception(const std::exception &) override { logException(); } void exception(const std::exception &) override { logException(); }
void exception() override { logException(); } void exception() override { logException(); }
void logMessageImpl(Poco::Message::Priority priority, const std::string & msg) override
{
switch (priority)
{
case Poco::Message::PRIO_FATAL: [[fallthrough]];
case Poco::Message::PRIO_CRITICAL:
LOG_FATAL(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_ERROR:
LOG_ERROR(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_WARNING:
LOG_WARNING(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_NOTICE: [[fallthrough]];
case Poco::Message::PRIO_INFORMATION:
LOG_INFO(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_DEBUG:
LOG_DEBUG(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_TRACE:
LOG_TRACE(trace_log, fmt::runtime(msg)); break;
case Poco::Message::PRIO_TEST:
LOG_TEST(trace_log, fmt::runtime(msg)); break;
}
}
private: private:
LoggerPtr log = getLogger("ServerErrorHandler"); LoggerPtr log = getLogger("ServerErrorHandler");
LoggerPtr trace_log = getLogger("Poco");
void logException() void logException()
{ {

View File

@ -20,6 +20,7 @@ public:
UInt16 portNumber() const override { return tcp_server->portNumber(); } UInt16 portNumber() const override { return tcp_server->portNumber(); }
size_t currentConnections() const override { return tcp_server->currentConnections(); } size_t currentConnections() const override { return tcp_server->currentConnections(); }
size_t currentThreads() const override { return tcp_server->currentThreads(); } size_t currentThreads() const override { return tcp_server->currentThreads(); }
size_t refusedConnections() const override { return tcp_server->refusedConnections(); }
private: private:
std::unique_ptr<TCPServer> tcp_server; std::unique_ptr<TCPServer> tcp_server;
@ -54,6 +55,7 @@ public:
UInt16 portNumber() const override { return grpc_server->portNumber(); } UInt16 portNumber() const override { return grpc_server->portNumber(); }
size_t currentConnections() const override { return grpc_server->currentConnections(); } size_t currentConnections() const override { return grpc_server->currentConnections(); }
size_t currentThreads() const override { return grpc_server->currentThreads(); } size_t currentThreads() const override { return grpc_server->currentThreads(); }
size_t refusedConnections() const override { return 0; }
private: private:
std::unique_ptr<GRPCServer> grpc_server; std::unique_ptr<GRPCServer> grpc_server;

View File

@ -38,6 +38,8 @@ public:
/// Returns the number of currently handled connections. /// Returns the number of currently handled connections.
size_t currentConnections() const { return impl->currentConnections(); } size_t currentConnections() const { return impl->currentConnections(); }
size_t refusedConnections() const { return impl->refusedConnections(); }
/// Returns the number of current threads. /// Returns the number of current threads.
size_t currentThreads() const { return impl->currentThreads(); } size_t currentThreads() const { return impl->currentThreads(); }
@ -61,6 +63,7 @@ private:
virtual UInt16 portNumber() const = 0; virtual UInt16 portNumber() const = 0;
virtual size_t currentConnections() const = 0; virtual size_t currentConnections() const = 0;
virtual size_t currentThreads() const = 0; virtual size_t currentThreads() const = 0;
virtual size_t refusedConnections() const = 0;
}; };
class TCPServerAdapterImpl; class TCPServerAdapterImpl;
class GRPCServerAdapterImpl; class GRPCServerAdapterImpl;