Merge pull request #17463 from amosbird/threadmetrics

Add connection thread metrics
This commit is contained in:
alexey-milovidov 2020-12-18 21:22:51 +03:00 committed by GitHub
commit e0c2de9a8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 131 additions and 57 deletions

View File

@ -76,6 +76,8 @@
# define NO_SANITIZE_THREAD
#endif
/// A macro for suppressing warnings about unused variables or function results.
/// Useful for structured bindings which have no standard way to declare this.
#define UNUSED(...) (void)(__VA_ARGS__)
/// A template function for suppressing warnings about unused variables or function results.
template <typename... Args>
constexpr void UNUSED(Args &&... args [[maybe_unused]])
{
}

View File

@ -785,17 +785,17 @@ int Server::main(const std::vector<std::string> & /*args*/)
for (const auto & listen_host : listen_hosts)
{
/// TCP TestKeeper
createServer(listen_host, "test_keeper_server.tcp_port", listen_try, [&](UInt16 port)
const char * port_name = "test_keeper_server.tcp_port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers_to_start_before_tables.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new TestKeeperTCPHandlerFactory(*this),
server_pool,
socket,
new Poco::Net::TCPServerParams));
servers_to_start_before_tables.emplace_back(
port_name,
std::make_unique<Poco::Net::TCPServer>(
new TestKeeperTCPHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for connections to fake zookeeper (tcp): {}", address.toString());
});
@ -981,35 +981,37 @@ int Server::main(const std::vector<std::string> & /*args*/)
std::vector<ProtocolServerAdapter> servers;
{
/// This object will periodically calculate some metrics.
AsynchronousMetrics async_metrics(*global_context,
config().getUInt("asynchronous_metrics_update_period_s", 60));
AsynchronousMetrics async_metrics(
*global_context, config().getUInt("asynchronous_metrics_update_period_s", 60), servers_to_start_before_tables, servers);
attachSystemTablesAsync(*DatabaseCatalog::instance().getSystemDatabase(), async_metrics);
for (const auto & listen_host : listen_hosts)
{
/// HTTP
createServer(listen_host, "http_port", listen_try, [&](UInt16 port)
const char * port_name = "http_port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
servers.emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for http://{}", address.toString());
});
/// HTTPS
createServer(listen_host, "https_port", listen_try, [&](UInt16 port)
port_name = "https_port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{
#if USE_SSL
Poco::Net::SecureServerSocket socket;
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
servers.emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for https://{}", address.toString());
@ -1021,13 +1023,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
});
/// TCP
createServer(listen_host, "tcp_port", listen_try, [&](UInt16 port)
port_name = "tcp_port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
servers.emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false),
server_pool,
socket,
@ -1037,13 +1040,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
});
/// TCP with PROXY protocol, see https://github.com/wolfeidau/proxyv2/blob/master/docs/proxy-protocol.txt
createServer(listen_host, "tcp_with_proxy_port", listen_try, [&](UInt16 port)
port_name = "tcp_with_proxy_port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
servers.emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true),
server_pool,
socket,
@ -1053,14 +1057,15 @@ int Server::main(const std::vector<std::string> & /*args*/)
});
/// TCP with SSL
createServer(listen_host, "tcp_port_secure", listen_try, [&](UInt16 port)
port_name = "tcp_port_secure";
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{
#if USE_SSL
Poco::Net::SecureServerSocket socket;
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
servers.emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false),
server_pool,
socket,
@ -1074,26 +1079,28 @@ int Server::main(const std::vector<std::string> & /*args*/)
});
/// Interserver IO HTTP
createServer(listen_host, "interserver_http_port", listen_try, [&](UInt16 port)
port_name = "interserver_http_port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
servers.emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for replica communication (interserver): http://{}", address.toString());
});
createServer(listen_host, "interserver_https_port", listen_try, [&](UInt16 port)
port_name = "interserver_https_port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{
#if USE_SSL
Poco::Net::SecureServerSocket socket;
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
servers.emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for secure replica communication (interserver): https://{}", address.toString());
@ -1104,13 +1111,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
#endif
});
createServer(listen_host, "mysql_port", listen_try, [&](UInt16 port)
port_name = "mysql_port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
servers.emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
new MySQLHandlerFactory(*this),
server_pool,
socket,
@ -1119,13 +1127,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Listening for MySQL compatibility protocol: {}", address.toString());
});
createServer(listen_host, "postgresql_port", listen_try, [&](UInt16 port)
port_name = "postgresql_port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
servers.emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
new PostgreSQLHandlerFactory(*this),
server_pool,
socket,
@ -1135,22 +1144,24 @@ int Server::main(const std::vector<std::string> & /*args*/)
});
#if USE_GRPC
createServer(listen_host, "grpc_port", listen_try, [&](UInt16 port)
port_name = "grpc_port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{
Poco::Net::SocketAddress server_address(listen_host, port);
servers.emplace_back(std::make_unique<GRPCServer>(*this, makeSocketAddress(listen_host, port, log)));
servers.emplace_back(port_name, std::make_unique<GRPCServer>(*this, makeSocketAddress(listen_host, port, log)));
LOG_INFO(log, "Listening for gRPC protocol: " + server_address.toString());
});
#endif
/// Prometheus (if defined and not setup yet with http_port)
createServer(listen_host, "prometheus.port", listen_try, [&](UInt16 port)
port_name = "prometheus.port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
servers.emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for Prometheus: http://{}", address.toString());
@ -1161,6 +1172,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
/// Must be done after initialization of `servers`, because async_metrics will access `servers` variable from its thread.
async_metrics.start();
global_context->enableNamedSessions();
for (auto & server : servers)

View File

@ -7,6 +7,7 @@
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/typeid_cast.h>
#include <Server/ProtocolServerAdapter.h>
#include <Storages/MarkCache.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
@ -43,7 +44,8 @@ AsynchronousMetrics::~AsynchronousMetrics()
}
wait_cond.notify_one();
thread.join();
if (thread)
thread->join();
}
catch (...)
{
@ -169,7 +171,7 @@ void AsynchronousMetrics::update()
AsynchronousMetricValues new_values;
{
if (auto mark_cache = context.getMarkCache())
if (auto mark_cache = global_context.getMarkCache())
{
new_values["MarkCacheBytes"] = mark_cache->weight();
new_values["MarkCacheFiles"] = mark_cache->count();
@ -177,7 +179,7 @@ void AsynchronousMetrics::update()
}
{
if (auto uncompressed_cache = context.getUncompressedCache())
if (auto uncompressed_cache = global_context.getUncompressedCache())
{
new_values["UncompressedCacheBytes"] = uncompressed_cache->weight();
new_values["UncompressedCacheCells"] = uncompressed_cache->count();
@ -186,12 +188,12 @@ void AsynchronousMetrics::update()
#if USE_EMBEDDED_COMPILER
{
if (auto compiled_expression_cache = context.getCompiledExpressionCache())
if (auto compiled_expression_cache = global_context.getCompiledExpressionCache())
new_values["CompiledExpressionCacheCount"] = compiled_expression_cache->count();
}
#endif
new_values["Uptime"] = context.getUptimeSeconds();
new_values["Uptime"] = global_context.getUptimeSeconds();
/// Process memory usage according to OS
#if defined(OS_LINUX)
@ -250,7 +252,7 @@ void AsynchronousMetrics::update()
/// Check if database can contain MergeTree tables
if (!db.second->canContainMergeTreeTables())
continue;
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
for (auto iterator = db.second->getTablesIterator(global_context); iterator->isValid(); iterator->next())
{
++total_number_of_tables;
const auto & table = iterator->table();
@ -312,6 +314,39 @@ void AsynchronousMetrics::update()
new_values["NumberOfDatabases"] = number_of_databases;
new_values["NumberOfTables"] = total_number_of_tables;
auto get_metric_name = [](const String & name) -> const char *
{
static std::map<String, const char *> metric_map = {
{"tcp_port", "TCPThreads"},
{"tcp_port_secure", "TCPSecureThreads"},
{"http_port", "HTTPThreads"},
{"https_port", "HTTPSecureThreads"},
{"interserver_http_port", "InterserverThreads"},
{"interserver_https_port", "InterserverSecureThreads"},
{"mysql_port", "MySQLThreads"},
{"postgresql_port", "PostgreSQLThreads"},
{"grpc_port", "GRPCThreads"},
{"prometheus.port", "PrometheusThreads"}
};
auto it = metric_map.find(name);
if (it == metric_map.end())
return nullptr;
else
return it->second;
};
for (const auto & server : servers_to_start_before_tables)
{
if (const auto * name = get_metric_name(server.getPortName()))
new_values[name] = server.currentThreads();
}
for (const auto & server : servers)
{
if (const auto * name = get_metric_name(server.getPortName()))
new_values[name] = server.currentThreads();
}
}
#if USE_JEMALLOC && JEMALLOC_VERSION_MAJOR >= 4
@ -386,7 +421,7 @@ void AsynchronousMetrics::update()
/// Add more metrics as you wish.
// Log the new metrics.
if (auto log = context.getAsynchronousMetricLog())
if (auto log = global_context.getAsynchronousMetricLog())
{
log->addValues(new_values);
}

View File

@ -13,9 +13,10 @@ namespace DB
{
class Context;
class ProtocolServerAdapter;
typedef double AsynchronousMetricValue;
typedef std::unordered_map<std::string, AsynchronousMetricValue> AsynchronousMetricValues;
using AsynchronousMetricValue = double;
using AsynchronousMetricValues = std::unordered_map<std::string, AsynchronousMetricValue>;
/** Periodically (by default, each minute, starting at 30 seconds offset)
@ -28,22 +29,34 @@ public:
// The default value of update_period_seconds is for ClickHouse-over-YT
// in Arcadia -- it uses its own server implementation that also uses these
// metrics.
AsynchronousMetrics(Context & context_, int update_period_seconds = 60)
: context(context_),
update_period(update_period_seconds),
thread([this] { run(); })
AsynchronousMetrics(
Context & global_context_,
int update_period_seconds,
const std::vector<ProtocolServerAdapter> & servers_to_start_before_tables_,
const std::vector<ProtocolServerAdapter> & servers_)
: global_context(global_context_)
, update_period(update_period_seconds)
, servers_to_start_before_tables(servers_to_start_before_tables_)
, servers(servers_)
{
}
~AsynchronousMetrics();
/// Separate method allows to initialize the `servers` variable beforehand.
void start()
{
thread = std::make_unique<ThreadFromGlobalPool>([this] { run(); });
}
/// Returns copy of all values.
AsynchronousMetricValues getValues() const;
private:
Context & context;
Context & global_context;
const std::chrono::seconds update_period;
const std::vector<ProtocolServerAdapter> & servers_to_start_before_tables;
const std::vector<ProtocolServerAdapter> & servers;
mutable std::mutex mutex;
std::condition_variable wait_cond;
@ -54,7 +67,7 @@ private:
MemoryStatisticsOS memory_stat;
#endif
ThreadFromGlobalPool thread;
std::unique_ptr<ThreadFromGlobalPool> thread;
void run();
void update();

View File

@ -35,6 +35,9 @@ public:
/// Returns the number of currently handled connections.
size_t currentConnections() const;
/// Returns the number of current threads.
size_t currentThreads() const { return currentConnections(); }
private:
using GRPCService = clickhouse::grpc::ClickHouse::AsyncService;
class Runner;

View File

@ -17,14 +17,15 @@ public:
void start() override { tcp_server->start(); }
void stop() override { tcp_server->stop(); }
size_t currentConnections() const override { return tcp_server->currentConnections(); }
size_t currentThreads() const override { return tcp_server->currentThreads(); }
private:
std::unique_ptr<Poco::Net::TCPServer> tcp_server;
};
ProtocolServerAdapter::ProtocolServerAdapter(std::unique_ptr<Poco::Net::TCPServer> tcp_server_)
ProtocolServerAdapter::ProtocolServerAdapter(const char * port_name_, std::unique_ptr<Poco::Net::TCPServer> tcp_server_)
: port_name(port_name_), impl(std::make_unique<TCPServerAdapterImpl>(std::move(tcp_server_)))
{
impl = std::make_unique<TCPServerAdapterImpl>(std::move(tcp_server_));
}
#if USE_GRPC
@ -37,14 +38,15 @@ public:
void start() override { grpc_server->start(); }
void stop() override { grpc_server->stop(); }
size_t currentConnections() const override { return grpc_server->currentConnections(); }
size_t currentThreads() const override { return grpc_server->currentThreads(); }
private:
std::unique_ptr<GRPCServer> grpc_server;
};
ProtocolServerAdapter::ProtocolServerAdapter(std::unique_ptr<GRPCServer> grpc_server_)
ProtocolServerAdapter::ProtocolServerAdapter(const char * port_name_, std::unique_ptr<GRPCServer> grpc_server_)
: port_name(port_name_), impl(std::make_unique<GRPCServerAdapterImpl>(std::move(grpc_server_)))
{
impl = std::make_unique<GRPCServerAdapterImpl>(std::move(grpc_server_));
}
#endif
}

View File

@ -5,6 +5,7 @@
#endif
#include <memory>
#include <string>
namespace Poco::Net { class TCPServer; }
@ -16,16 +17,14 @@ class GRPCServer;
/// no matter what type it has (HTTPServer, TCPServer, MySQLServer, GRPCServer, ...).
class ProtocolServerAdapter
{
friend class ProtocolServers;
public:
ProtocolServerAdapter() {}
ProtocolServerAdapter(ProtocolServerAdapter && src) = default;
ProtocolServerAdapter & operator =(ProtocolServerAdapter && src) = default;
~ProtocolServerAdapter() {}
ProtocolServerAdapter(std::unique_ptr<Poco::Net::TCPServer> tcp_server_);
ProtocolServerAdapter(const char * port_name_, std::unique_ptr<Poco::Net::TCPServer> tcp_server_);
#if USE_GRPC
ProtocolServerAdapter(std::unique_ptr<GRPCServer> grpc_server_);
ProtocolServerAdapter(const char * port_name_, std::unique_ptr<GRPCServer> grpc_server_);
#endif
/// Starts the server. A new thread will be created that waits for and accepts incoming connections.
@ -37,6 +36,11 @@ public:
/// Returns the number of currently handled connections.
size_t currentConnections() const { return impl->currentConnections(); }
/// Returns the number of current threads.
size_t currentThreads() const { return impl->currentThreads(); }
const std::string & getPortName() const { return port_name; }
private:
class Impl
{
@ -45,10 +49,12 @@ private:
virtual void start() = 0;
virtual void stop() = 0;
virtual size_t currentConnections() const = 0;
virtual size_t currentThreads() const = 0;
};
class TCPServerAdapterImpl;
class GRPCServerAdapterImpl;
std::string port_name;
std::unique_ptr<Impl> impl;
};