Add keeper monitoring data

This commit is contained in:
lingpeng0314 2022-06-10 15:49:46 +08:00
parent 1a67740cd3
commit f06b19bdbf
6 changed files with 172 additions and 1 deletions

View File

@ -93,6 +93,8 @@
M(CacheFileSegments, "Number of existing cache file segments") \
M(CacheDetachedFileSegments, "Number of existing detached cache file segments") \
M(S3Requests, "S3 requests") \
M(KeeperAliveConnections, "Number of alive connections") \
M(KeeperOutstandingRequets, "Number of outstanding requests") \
namespace CurrentMetrics
{

View File

@ -343,7 +343,20 @@
\
M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \
M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely")
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely") \
M(KeeperPacketsSent, "Packets sent by keeper server") \
M(KeeperPacketsReceived, "Packets received by keeper server") \
M(KeeperRequestTotal, "Total requests number on keeper server") \
M(KeeperLatency, "Keeper latency") \
M(KeeperCommits, "Number of commit count") \
M(KeeperCommitsFailed, "Number of commit failed count") \
M(KeeperSnapshotCreations, "Number of snapshot creations count")\
M(KeeperSnapshotCreationsFailed, "Number of snapshot creations failed count")\
M(KeeperSnapshotApplys, "Number of snapshot applying")\
M(KeeperSnapshotApplysFailed, "Number of snapshot applying failed")\
M(KeeperReadSnapshot, "Number of snapshot read(serialization)")\
M(KeeperSaveSnapshot, "Number of snapshot save")
namespace ProfileEvents
{

View File

@ -1,5 +1,14 @@
#include <atomic>
#include <Coordination/KeeperConnectionStats.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event KeeperPacketsSent;
extern const Event KeeperPacketsReceived;
extern const Event KeeperRequestTotal;
extern const Event KeeperLatency;
}
namespace DB
{
@ -40,18 +49,22 @@ uint64_t KeeperConnectionStats::getPacketsSent() const
void KeeperConnectionStats::incrementPacketsReceived()
{
packets_received.fetch_add(1, std::memory_order_relaxed);
ProfileEvents::increment(ProfileEvents::KeeperPacketsReceived, 1);
}
void KeeperConnectionStats::incrementPacketsSent()
{
packets_sent.fetch_add(1, std::memory_order_relaxed);
ProfileEvents::increment(ProfileEvents::KeeperPacketsSent, 1);
}
void KeeperConnectionStats::updateLatency(uint64_t latency_ms)
{
last_latency.store(latency_ms, std::memory_order_relaxed);
total_latency.fetch_add(latency_ms, std::memory_order_relaxed);
ProfileEvents::increment(ProfileEvents::KeeperLatency, latency_ms);
count.fetch_add(1, std::memory_order_relaxed);
ProfileEvents::increment(ProfileEvents::KeeperRequestTotal, 1);
uint64_t prev_val = min_latency.load(std::memory_order_relaxed);
while (prev_val > latency_ms && !min_latency.compare_exchange_weak(prev_val, latency_ms, std::memory_order_relaxed)) {}

View File

@ -7,6 +7,13 @@
#include <Common/hex.h>
#include <filesystem>
#include <Common/checkStackSize.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric KeeperAliveConnections;
extern const Metric KeeperOutstandingRequets;
}
namespace fs = std::filesystem;
@ -57,6 +64,7 @@ void KeeperDispatcher::requestThread()
{
if (requests_queue->tryPop(request, max_wait))
{
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
if (shutdown_called)
break;
@ -78,6 +86,7 @@ void KeeperDispatcher::requestThread()
/// Trying to get batch requests as fast as possible
if (requests_queue->tryPop(request, 1))
{
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
/// Don't append read request into batch, we have to process them separately
if (!coordination_settings->quorum_reads && request.request->isReadRequest())
{
@ -225,6 +234,7 @@ void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKe
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
{
session_to_response_callback.erase(session_response_callback);
CurrentMetrics::sub(CurrentMetrics::KeeperAliveConnections);
}
}
}
@ -259,6 +269,7 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
{
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
return true;
}
@ -350,6 +361,7 @@ void KeeperDispatcher::shutdown()
/// Set session expired for all pending requests
while (requests_queue && requests_queue->tryPop(request_for_session))
{
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
auto response = request_for_session.request->makeResponse();
response->error = Coordination::Error::ZSESSIONEXPIRED;
setResponse(request_for_session.session_id, response);
@ -358,6 +370,7 @@ void KeeperDispatcher::shutdown()
/// Clear all registered sessions
std::lock_guard lock(session_to_response_callback_mutex);
session_to_response_callback.clear();
CurrentMetrics::set(CurrentMetrics::KeeperAliveConnections, 0);
}
catch (...)
{
@ -382,6 +395,7 @@ void KeeperDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCall
std::lock_guard lock(session_to_response_callback_mutex);
if (!session_to_response_callback.try_emplace(session_id, callback).second)
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id);
CurrentMetrics::add(CurrentMetrics::KeeperAliveConnections);
}
void KeeperDispatcher::sessionCleanerTask()
@ -414,6 +428,7 @@ void KeeperDispatcher::sessionCleanerTask()
std::lock_guard lock(push_request_mutex);
if (!requests_queue->push(std::move(request_info)))
LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions");
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
}
/// Remove session from registered sessions
@ -437,7 +452,10 @@ void KeeperDispatcher::finishSession(int64_t session_id)
std::lock_guard lock(session_to_response_callback_mutex);
auto session_it = session_to_response_callback.find(session_id);
if (session_it != session_to_response_callback.end())
{
session_to_response_callback.erase(session_it);
CurrentMetrics::sub(CurrentMetrics::KeeperAliveConnections);
}
}
void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSessions & requests_for_sessions, Coordination::Error error)
@ -520,6 +538,7 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
std::lock_guard lock(push_request_mutex);
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
throw Exception("Cannot push session id request to queue within session timeout", ErrorCodes::TIMEOUT_EXCEEDED);
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
}
if (future.wait_for(std::chrono::milliseconds(session_timeout_ms)) != std::future_status::ready)

View File

@ -8,8 +8,21 @@
#include <sys/mman.h>
#include "Common/ZooKeeper/ZooKeeperCommon.h"
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/ProfileEvents.h>
#include "Coordination/KeeperStorage.h"
namespace ProfileEvents
{
extern const Event KeeperCommits;
extern const Event KeeperCommitsFailed;
extern const Event KeeperSnapshotCreations;
extern const Event KeeperSnapshotCreationsFailed;
extern const Event KeeperSnapshotApplys;
extern const Event KeeperSnapshotApplysFailed;
extern const Event KeeperReadSnapshot;
extern const Event KeeperSaveSnapshot;
}
namespace DB
{
@ -219,9 +232,12 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
response->session_id = session_id;
if (!responses_queue.push(response_for_session))
{
ProfileEvents::increment(ProfileEvents::KeeperCommitsFailed);
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", session_id);
}
}
}
else
{
std::lock_guard lock(storage_and_responses_lock);
@ -229,10 +245,13 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
request_for_session.request, request_for_session.session_id, request_for_session.zxid);
for (auto & response_for_session : responses_for_sessions)
if (!responses_queue.push(response_for_session))
{
ProfileEvents::increment(ProfileEvents::KeeperCommitsFailed);
throw Exception(
ErrorCodes::SYSTEM_ERROR,
"Could not push response with session id {} into responses queue",
response_for_session.session_id);
}
if (digest_enabled && request_for_session.digest)
{
@ -240,6 +259,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
}
}
ProfileEvents::increment(ProfileEvents::KeeperCommits);
last_committed_idx = log_idx;
return nullptr;
}
@ -251,11 +271,14 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
{ /// save snapshot into memory
std::lock_guard lock(snapshots_lock);
if (s.get_last_log_idx() != latest_snapshot_meta->get_last_log_idx())
{
ProfileEvents::increment(ProfileEvents::KeeperSnapshotApplysFailed);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Required to apply snapshot with last log index {}, but our last log index is {}",
s.get_last_log_idx(),
latest_snapshot_meta->get_last_log_idx());
}
latest_snapshot_ptr = latest_snapshot_buf;
}
@ -268,6 +291,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
cluster_config = snapshot_deserialization_result.cluster_config;
}
ProfileEvents::increment(ProfileEvents::KeeperSnapshotApplys);
last_committed_idx = s.get_last_log_idx();
return true;
}
@ -332,6 +356,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
}
latest_snapshot_path = path;
latest_snapshot_meta = snapshot->snapshot_meta;
ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreations);
LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), path);
}
@ -347,6 +372,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
}
catch (...)
{
ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreationsFailed);
LOG_TRACE(log, "Exception happened during snapshot");
tryLogCurrentException(log);
ret = false;
@ -380,6 +406,7 @@ void KeeperStateMachine::save_logical_snp_obj(
latest_snapshot_meta = cloned_meta;
LOG_DEBUG(log, "Saved snapshot {} to path {}", s.get_last_log_idx(), result_path);
obj_id++;
ProfileEvents::increment(ProfileEvents::KeeperSaveSnapshot);
}
catch (...)
{
@ -441,6 +468,7 @@ int KeeperStateMachine::read_logical_snp_obj(
return -1;
}
is_last_obj = true;
ProfileEvents::increment(ProfileEvents::KeeperReadSnapshot);
return 1;
}

View File

@ -4,6 +4,8 @@
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/Context.h>
#include <Coordination/Keeper4LWInfo.h>
#include <Coordination/KeeperDispatcher.h>
#include <Common/Exception.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
@ -11,6 +13,8 @@
#include <Common/filesystemHelpers.h>
#include <Common/FileCacheFactory.h>
#include <Common/IFileCache.h>
#include <Common/getCurrentProcessFDCount.h>
#include <Common/getMaxFileDescriptorCount.h>
#include <Server/ProtocolServerAdapter.h>
#include <Storages/MarkCache.h>
#include <Storages/StorageMergeTree.h>
@ -1464,6 +1468,98 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
new_values[name] = server_metric.current_threads;
}
}
#if USE_NURAFT
{
try {
auto keeper_dispatcher = getContext()->getKeeperDispatcher();
size_t is_leader = 0;
size_t is_follower = 0;
size_t is_observer = 0;
size_t is_standalone = 0;
size_t znode_count = 0;
size_t watch_count =0;
size_t ephemerals_count = 0;
size_t approximate_data_size =0;
size_t key_arena_size = 0;
size_t latest_snapshot_size =0;
size_t open_file_descriptor_count =0;
size_t max_file_descriptor_count =0;
size_t followers =0;
size_t synced_followers = 0;
size_t zxid = 0;
size_t session_with_watches = 0;
size_t paths_watched = 0;
size_t snapshot_dir_size = 0;
size_t log_dir_size = 0;
if (keeper_dispatcher && keeper_dispatcher->isServerActive())
{
auto keeper_4LW_info = keeper_dispatcher -> getKeeper4LWInfo();
if (keeper_4LW_info.is_standalone)
is_standalone = 1;
if (keeper_4LW_info.is_leader)
is_leader = 1;
if (keeper_4LW_info.is_observer)
is_observer = 1;
if (keeper_4LW_info.is_follower)
is_follower = 1;
zxid = keeper_4LW_info.last_zxid;
const auto & state_machine = keeper_dispatcher->getStateMachine();
znode_count = state_machine.getNodesCount();
watch_count = state_machine.getTotalWatchesCount();
ephemerals_count = state_machine.getTotalEphemeralNodesCount();
approximate_data_size =state_machine.getApproximateDataSize();
key_arena_size=state_machine.getKeyArenaSize();
latest_snapshot_size=state_machine.getLatestSnapshotBufSize();
session_with_watches = state_machine.getSessionsWithWatchesCount();
paths_watched = state_machine.getWatchedPathsCount();
snapshot_dir_size = keeper_dispatcher->getSnapDirSize();
log_dir_size = keeper_dispatcher->getLogDirSize();
#if defined(__linux__) || defined(__APPLE__)
open_file_descriptor_count = getCurrentProcessFDCount();
max_file_descriptor_count = getMaxFileDescriptorCount();
#endif
if (keeper_4LW_info.is_leader)
{
followers = keeper_4LW_info.follower_count;
synced_followers = keeper_4LW_info.synced_follower_count;
}
}
new_values["KeeperIsLeader"] = is_leader;
new_values["KeeperIsFollower"] = is_follower;
new_values["KeeperIsObserver"] = is_observer;
new_values["KeeperIsStandalone"] = is_standalone;
new_values["KeeperZnodeCount"] = znode_count;
new_values["KeeperWatchCount"] = watch_count;
new_values["KeeperEphemeralsCount"] = ephemerals_count;
new_values["KeeperApproximateDataSize"] = approximate_data_size;
new_values["KeeperKeyArenaSize"] = key_arena_size;
new_values["KeeperLatestSnapshotSize"] = latest_snapshot_size;
new_values["KeeperOpenFileDescriptorCount"] = open_file_descriptor_count;
new_values["KeeperMaxFileDescriptorCount"] = max_file_descriptor_count;
new_values["KeeperFollowers"] = followers;
new_values["KeeperSyncedFollowers"] = synced_followers;
new_values["KeeperZxid"] = zxid;
new_values["KeeperSessionWithWatches"] = session_with_watches;
new_values["KeeperPathsWatched"] = paths_watched;
new_values["KeeperSnapshotDirSize"] = snapshot_dir_size;
new_values["KeeperLogDirSize"] = log_dir_size;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
#endif
#if USE_JEMALLOC
// 'epoch' is a special mallctl -- it updates the statistics. Without it, all