Add more metrics for Keeper

This commit is contained in:
Antonio Andelic 2022-11-09 15:51:41 +01:00
parent c92ae5a385
commit 1ed3930809
13 changed files with 125 additions and 68 deletions

View File

@ -281,50 +281,56 @@ void Keeper::defineOptions(Poco::Util::OptionSet & options)
struct Keeper::KeeperHTTPContext : public IHTTPContext
{
explicit KeeperHTTPContext(const TinyContext & context_)
: context(context_)
{}
uint64_t getMaxHstsAge() const override
{
return 0;
return context.getConfigRef().getUInt64("keeper_server.hsts_max_age", 0);
}
uint64_t getMaxUriSize() const override
{
return 1048576;
return context.getConfigRef().getUInt64("keeper_server.http_max_uri_size", 1048576);
}
uint64_t getMaxFields() const override
{
return 1000000;
return context.getConfigRef().getUInt64("keeper_server.http_max_fields", 1000000);
}
uint64_t getMaxFieldNameSize() const override
{
return 1048576;
return context.getConfigRef().getUInt64("keeper_server.http_max_field_name_size", 1048576);
}
uint64_t getMaxFieldValueSize() const override
{
return 1048576;
return context.getConfigRef().getUInt64("keeper_server.http_max_field_value_size", 1048576);
}
uint64_t getMaxChunkSize() const override
{
return 100_GiB;
return context.getConfigRef().getUInt64("keeper_server.http_max_chunk_size", 100_GiB);
}
Poco::Timespan getReceiveTimeout() const override
{
return DEFAULT_HTTP_READ_BUFFER_TIMEOUT;
return context.getConfigRef().getUInt64("keeper_server.http_receive_timeout", DEFAULT_HTTP_READ_BUFFER_TIMEOUT);
}
Poco::Timespan getSendTimeout() const override
{
return DEFAULT_HTTP_READ_BUFFER_TIMEOUT;
return context.getConfigRef().getUInt64("keeper_server.http_send_timeout", DEFAULT_HTTP_READ_BUFFER_TIMEOUT);
}
const TinyContext & context;
};
HTTPContextPtr Keeper::httpContext()
{
return std::make_shared<KeeperHTTPContext>();
return std::make_shared<KeeperHTTPContext>(tiny_context);
}
int Keeper::main(const std::vector<std::string> & /*args*/)
@ -419,7 +425,6 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
}
);
std::vector<std::string> listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host");
bool listen_try = config().getBool("listen_try", false);

View File

@ -66,7 +66,7 @@ private:
TinyContext tiny_context;
struct KeeperHTTPContext;
static HTTPContextPtr httpContext();
HTTPContextPtr httpContext();
Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure = false) const;

View File

@ -1927,58 +1927,9 @@ std::unique_ptr<TCPProtocolStackFactory> Server::buildProtocolStackFromConfig(
return stack;
}
struct Server::ServerHTTPContext : public IHTTPContext
{
explicit ServerHTTPContext(ContextPtr context_)
: context(Context::createCopy(context_))
{}
uint64_t getMaxHstsAge() const override
{
return context->getSettingsRef().hsts_max_age;
}
uint64_t getMaxUriSize() const override
{
return context->getSettingsRef().http_max_uri_size;
}
uint64_t getMaxFields() const override
{
return context->getSettingsRef().http_max_fields;
}
uint64_t getMaxFieldNameSize() const override
{
return context->getSettingsRef().http_max_field_name_size;
}
uint64_t getMaxFieldValueSize() const override
{
return context->getSettingsRef().http_max_field_value_size;
}
uint64_t getMaxChunkSize() const override
{
return context->getSettingsRef().http_max_chunk_size;
}
Poco::Timespan getReceiveTimeout() const override
{
return context->getSettingsRef().http_receive_timeout;
}
Poco::Timespan getSendTimeout() const override
{
return context->getSettingsRef().http_send_timeout;
}
ContextPtr context;
};
HTTPContextPtr Server::httpContext() const
{
return std::make_shared<ServerHTTPContext>(context());
return std::make_shared<HTTPContext>(context());
}
void Server::createServers(

View File

@ -73,7 +73,6 @@ private:
/// Updated/recent config, to compare http_handlers
ConfigurationPtr latest_config;
struct ServerHTTPContext;
HTTPContextPtr httpContext() const;
Poco::Net::SocketAddress socketBindListen(

View File

@ -236,7 +236,7 @@ int IBridge::main(const std::vector<std::string> & /*args*/)
SensitiveDataMasker::setInstance(std::make_unique<SensitiveDataMasker>(config(), "query_masking_rules"));
auto server = HTTPServer(
context,
std::make_shared<HTTPContext>(context),
getHandlerFactoryPtr(context),
server_pool,
socket,

View File

@ -433,6 +433,15 @@ The server successfully detected this situation and will download merged part fr
M(KeeperSnapshotApplysFailed, "Number of failed snapshot applying")\
M(KeeperReadSnapshot, "Number of snapshot read(serialization)")\
M(KeeperSaveSnapshot, "Number of snapshot save")\
M(KeeperCreateRequest, "Number of create requests")\
M(KeeperRemoveRequest, "Number of remove requests")\
M(KeeperSetRequest, "Number of set requests")\
M(KeeperCheckRequest, "Number of check requests")\
M(KeeperMultiRequest, "Number of multi requests")\
M(KeeperMultiReadRequest, "Number of multi read requests")\
M(KeeperGetRequest, "Number of get requests")\
M(KeeperListRequest, "Number of list requests")\
M(KeeperExistsRequest, "Number of exists requests")\
\
M(OverflowBreak, "Number of times, data processing was cancelled by query complexity limitation with setting '*_overflow_mode' = 'break' and the result is incomplete.") \
M(OverflowThrow, "Number of times, data processing was cancelled by query complexity limitation with setting '*_overflow_mode' = 'throw' and exception was thrown.") \

View File

@ -8,7 +8,7 @@
namespace DB
{
void updateKeeperInformation(const KeeperDispatcher & keeper_dispatcher, AsynchronousMetricValues & new_values)
void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousMetricValues & new_values)
{
#if USE_NURAFT
size_t is_leader = 0;
@ -87,6 +87,23 @@ void updateKeeperInformation(const KeeperDispatcher & keeper_dispatcher, Asynchr
new_values["KeeperPathsWatched"] = paths_watched;
new_values["KeeperSnapshotDirSize"] = snapshot_dir_size;
new_values["KeeperLogDirSize"] = log_dir_size;
auto keeper_log_info = keeper_dispatcher.getKeeperLogInfo();
new_values["KeeperLastLogIdx"] = keeper_log_info.last_log_idx;
new_values["KeeperLastLogTerm"] = keeper_log_info.last_log_term;
new_values["KeeperLastCommittedLogIdx"] = keeper_log_info.last_committed_log_idx;
new_values["KeeperTargetCommitLogIdx"] = keeper_log_info.target_committed_log_idx;
new_values["KeeperLastSnapshotIdx"] = keeper_log_info.last_snapshot_idx;
auto & keeper_connection_stats = keeper_dispatcher.getKeeperConnectionStats();
new_values["KeeperMinLatency"] = keeper_connection_stats.getMinLatency();
new_values["KeeperMaxLatency"] = keeper_connection_stats.getMaxLatency();
new_values["KeeperAvgLatency"] = keeper_connection_stats.getAvgLatency();
new_values["KeeperPacketsReceived"] = keeper_connection_stats.getAvgLatency();
new_values["KeeperPacketsSent"] = keeper_connection_stats.getAvgLatency();
#endif
}

View File

@ -7,7 +7,7 @@ namespace DB
{
class KeeperDispatcher;
void updateKeeperInformation(const KeeperDispatcher & keeper_dispatcher, AsynchronousMetricValues & new_values);
void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousMetricValues & new_values);
class KeeperAsynchronousMetrics : public AsynchronousMetrics
{

View File

@ -15,6 +15,7 @@
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <Common/ProfileEvents.h>
#include <Coordination/pathUtils.h>
#include <Coordination/KeeperConstants.h>
@ -27,6 +28,19 @@
#include <base/defines.h>
#include <filesystem>
namespace ProfileEvents
{
extern const Event KeeperCreateRequest;
extern const Event KeeperRemoveRequest;
extern const Event KeeperSetRequest;
extern const Event KeeperCheckRequest;
extern const Event KeeperMultiRequest;
extern const Event KeeperMultiReadRequest;
extern const Event KeeperGetRequest;
extern const Event KeeperListRequest;
extern const Event KeeperExistsRequest;
}
namespace DB
{
@ -865,6 +879,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time, uint64_t & digest, const KeeperContext & keeper_context) const override
{
ProfileEvents::increment(ProfileEvents::KeeperCreateRequest);
Coordination::ZooKeeperCreateRequest & request = dynamic_cast<Coordination::ZooKeeperCreateRequest &>(*zk_request);
std::vector<KeeperStorage::Delta> new_deltas;
@ -986,6 +1001,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/, const KeeperContext & /*keeper_context*/) const override
{
ProfileEvents::increment(ProfileEvents::KeeperGetRequest);
Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request);
if (request.path == Coordination::keeper_api_version_path)
@ -1040,6 +1056,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid) const override
{
ProfileEvents::increment(ProfileEvents::KeeperGetRequest);
return processImpl<true>(storage, zxid);
}
};
@ -1055,6 +1072,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & digest, const KeeperContext & keeper_context) const override
{
ProfileEvents::increment(ProfileEvents::KeeperRemoveRequest);
Coordination::ZooKeeperRemoveRequest & request = dynamic_cast<Coordination::ZooKeeperRemoveRequest &>(*zk_request);
std::vector<KeeperStorage::Delta> new_deltas;
@ -1145,6 +1163,7 @@ struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestPr
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/, const KeeperContext & /*keeper_context*/) const override
{
ProfileEvents::increment(ProfileEvents::KeeperExistsRequest);
Coordination::ZooKeeperExistsRequest & request = dynamic_cast<Coordination::ZooKeeperExistsRequest &>(*zk_request);
if (!storage.uncommitted_state.getNode(request.path))
@ -1194,6 +1213,7 @@ struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestPr
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid) const override
{
ProfileEvents::increment(ProfileEvents::KeeperExistsRequest);
return processImpl<true>(storage, zxid);
}
};
@ -1209,6 +1229,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t time, uint64_t & digest, const KeeperContext & keeper_context) const override
{
ProfileEvents::increment(ProfileEvents::KeeperSetRequest);
Coordination::ZooKeeperSetRequest & request = dynamic_cast<Coordination::ZooKeeperSetRequest &>(*zk_request);
std::vector<KeeperStorage::Delta> new_deltas;
@ -1301,6 +1322,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/, const KeeperContext & /*keeper_context*/) const override
{
ProfileEvents::increment(ProfileEvents::KeeperListRequest);
Coordination::ZooKeeperListRequest & request = dynamic_cast<Coordination::ZooKeeperListRequest &>(*zk_request);
if (!storage.uncommitted_state.getNode(request.path))
@ -1387,6 +1409,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid) const override
{
ProfileEvents::increment(ProfileEvents::KeeperListRequest);
return processImpl<true>(storage, zxid);
}
};
@ -1402,6 +1425,7 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/, const KeeperContext & /*keeper_context*/) const override
{
ProfileEvents::increment(ProfileEvents::KeeperCheckRequest);
Coordination::ZooKeeperCheckRequest & request = dynamic_cast<Coordination::ZooKeeperCheckRequest &>(*zk_request);
if (!storage.uncommitted_state.getNode(request.path))
@ -1463,6 +1487,7 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid) const override
{
ProfileEvents::increment(ProfileEvents::KeeperCheckRequest);
return processImpl<true>(storage, zxid);
}
};
@ -1689,6 +1714,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time, uint64_t & digest, const KeeperContext & keeper_context) const override
{
ProfileEvents::increment(ProfileEvents::KeeperMultiRequest);
std::vector<Coordination::Error> response_errors;
response_errors.reserve(concrete_requests.size());
uint64_t current_digest = digest;
@ -1756,6 +1782,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid) const override
{
ProfileEvents::increment(ProfileEvents::KeeperMultiReadRequest);
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperMultiResponse & response = dynamic_cast<Coordination::ZooKeeperMultiResponse &>(*response_ptr);

View File

@ -18,6 +18,8 @@
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include <Storages/ColumnsDescription.h>
#include <Server/HTTP/HTTPContext.h>
#include "config.h"
@ -1062,4 +1064,53 @@ private:
DiskSelectorPtr getDiskSelector(std::lock_guard<std::mutex> & /* lock */) const;
};
struct HTTPContext : public IHTTPContext
{
explicit HTTPContext(ContextPtr context_)
: context(Context::createCopy(context_))
{}
uint64_t getMaxHstsAge() const override
{
return context->getSettingsRef().hsts_max_age;
}
uint64_t getMaxUriSize() const override
{
return context->getSettingsRef().http_max_uri_size;
}
uint64_t getMaxFields() const override
{
return context->getSettingsRef().http_max_fields;
}
uint64_t getMaxFieldNameSize() const override
{
return context->getSettingsRef().http_max_field_name_size;
}
uint64_t getMaxFieldValueSize() const override
{
return context->getSettingsRef().http_max_field_value_size;
}
uint64_t getMaxChunkSize() const override
{
return context->getSettingsRef().http_max_chunk_size;
}
Poco::Timespan getReceiveTimeout() const override
{
return context->getSettingsRef().http_receive_timeout;
}
Poco::Timespan getSendTimeout() const override
{
return context->getSettingsRef().http_send_timeout;
}
ContextPtr context;
};
}

View File

@ -52,7 +52,7 @@ ServerAsynchronousMetrics::ServerAsynchronousMetrics(
, WithContext(global_context_)
, heavy_metric_update_period(heavy_metrics_update_period_seconds)
{}
void ServerAsynchronousMetrics::updateImpl(AsynchronousMetricValues & new_values)
{
if (auto mark_cache = getContext()->getMarkCache())

View File

@ -5,7 +5,6 @@
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
#include <Server/HTTPHandlerRequestFilter.h>
#include <Common/StringUtils/StringUtils.h>
#include <Daemon/BaseDaemon.h>
#include <Common/logger_useful.h>
#include <Poco/Util/AbstractConfiguration.h>

View File

@ -1,7 +1,6 @@
#pragma once
#include <Server/HTTP/HTTPRequestHandler.h>
#include <Daemon/BaseDaemon.h>
#include "PrometheusMetricsWriter.h"