From f06b19bdbfc02e8dc85f564146208bbd29f38771 Mon Sep 17 00:00:00 2001 From: lingpeng0314 Date: Fri, 10 Jun 2022 15:49:46 +0800 Subject: [PATCH] Add keeper monitoring data --- src/Common/CurrentMetrics.cpp | 2 + src/Common/ProfileEvents.cpp | 15 +++- src/Coordination/KeeperConnectionStats.cpp | 13 +++ src/Coordination/KeeperDispatcher.cpp | 19 +++++ src/Coordination/KeeperStateMachine.cpp | 28 +++++++ src/Interpreters/AsynchronousMetrics.cpp | 96 ++++++++++++++++++++++ 6 files changed, 172 insertions(+), 1 deletion(-) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index fa9c60c6f79..f479e4cc140 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -93,6 +93,8 @@ M(CacheFileSegments, "Number of existing cache file segments") \ M(CacheDetachedFileSegments, "Number of existing detached cache file segments") \ M(S3Requests, "S3 requests") \ + M(KeeperAliveConnections, "Number of alive connections") \ + M(KeeperOutstandingRequets, "Number of outstanding requests") \ namespace CurrentMetrics { diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index b8e552f6023..a2886330ce9 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -343,7 +343,20 @@ \ M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \ M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \ - M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely") + M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely") \ + M(KeeperPacketsSent, "Packets sent by keeper server") \ + M(KeeperPacketsReceived, "Packets received by keeper server") \ + M(KeeperRequestTotal, "Total requests number on keeper server") \ + M(KeeperLatency, "Keeper latency") \ + M(KeeperCommits, "Number of commit count") \ + M(KeeperCommitsFailed, "Number of commit failed count") \ + M(KeeperSnapshotCreations, "Number of snapshot creations count")\ + M(KeeperSnapshotCreationsFailed, "Number of snapshot creations failed count")\ + M(KeeperSnapshotApplys, "Number of snapshot applying")\ + M(KeeperSnapshotApplysFailed, "Number of snapshot applying failed")\ + M(KeeperReadSnapshot, "Number of snapshot read(serialization)")\ + M(KeeperSaveSnapshot, "Number of snapshot save") + namespace ProfileEvents { diff --git a/src/Coordination/KeeperConnectionStats.cpp b/src/Coordination/KeeperConnectionStats.cpp index b4edfe45159..71f24f1ed02 100644 --- a/src/Coordination/KeeperConnectionStats.cpp +++ b/src/Coordination/KeeperConnectionStats.cpp @@ -1,5 +1,14 @@ #include #include +#include + +namespace ProfileEvents +{ + extern const Event KeeperPacketsSent; + extern const Event KeeperPacketsReceived; + extern const Event KeeperRequestTotal; + extern const Event KeeperLatency; +} namespace DB { @@ -40,18 +49,22 @@ uint64_t KeeperConnectionStats::getPacketsSent() const void KeeperConnectionStats::incrementPacketsReceived() { packets_received.fetch_add(1, std::memory_order_relaxed); + ProfileEvents::increment(ProfileEvents::KeeperPacketsReceived, 1); } void KeeperConnectionStats::incrementPacketsSent() { packets_sent.fetch_add(1, std::memory_order_relaxed); + ProfileEvents::increment(ProfileEvents::KeeperPacketsSent, 1); } void KeeperConnectionStats::updateLatency(uint64_t latency_ms) { last_latency.store(latency_ms, std::memory_order_relaxed); total_latency.fetch_add(latency_ms, std::memory_order_relaxed); + ProfileEvents::increment(ProfileEvents::KeeperLatency, latency_ms); count.fetch_add(1, std::memory_order_relaxed); + ProfileEvents::increment(ProfileEvents::KeeperRequestTotal, 1); uint64_t prev_val = min_latency.load(std::memory_order_relaxed); while (prev_val > latency_ms && !min_latency.compare_exchange_weak(prev_val, latency_ms, std::memory_order_relaxed)) {} diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 9ad5fe9e8ed..4798e0995a2 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -7,6 +7,13 @@ #include #include #include +#include + +namespace CurrentMetrics +{ + extern const Metric KeeperAliveConnections; + extern const Metric KeeperOutstandingRequets; +} namespace fs = std::filesystem; @@ -57,6 +64,7 @@ void KeeperDispatcher::requestThread() { if (requests_queue->tryPop(request, max_wait)) { + CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets); if (shutdown_called) break; @@ -78,6 +86,7 @@ void KeeperDispatcher::requestThread() /// Trying to get batch requests as fast as possible if (requests_queue->tryPop(request, 1)) { + CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets); /// Don't append read request into batch, we have to process them separately if (!coordination_settings->quorum_reads && request.request->isReadRequest()) { @@ -225,6 +234,7 @@ void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKe if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close) { session_to_response_callback.erase(session_response_callback); + CurrentMetrics::sub(CurrentMetrics::KeeperAliveConnections); } } } @@ -259,6 +269,7 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ { throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED); } + CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets); return true; } @@ -350,6 +361,7 @@ void KeeperDispatcher::shutdown() /// Set session expired for all pending requests while (requests_queue && requests_queue->tryPop(request_for_session)) { + CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets); auto response = request_for_session.request->makeResponse(); response->error = Coordination::Error::ZSESSIONEXPIRED; setResponse(request_for_session.session_id, response); @@ -358,6 +370,7 @@ void KeeperDispatcher::shutdown() /// Clear all registered sessions std::lock_guard lock(session_to_response_callback_mutex); session_to_response_callback.clear(); + CurrentMetrics::set(CurrentMetrics::KeeperAliveConnections, 0); } catch (...) { @@ -382,6 +395,7 @@ void KeeperDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCall std::lock_guard lock(session_to_response_callback_mutex); if (!session_to_response_callback.try_emplace(session_id, callback).second) throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id); + CurrentMetrics::add(CurrentMetrics::KeeperAliveConnections); } void KeeperDispatcher::sessionCleanerTask() @@ -414,6 +428,7 @@ void KeeperDispatcher::sessionCleanerTask() std::lock_guard lock(push_request_mutex); if (!requests_queue->push(std::move(request_info))) LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions"); + CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets); } /// Remove session from registered sessions @@ -437,7 +452,10 @@ void KeeperDispatcher::finishSession(int64_t session_id) std::lock_guard lock(session_to_response_callback_mutex); auto session_it = session_to_response_callback.find(session_id); if (session_it != session_to_response_callback.end()) + { session_to_response_callback.erase(session_it); + CurrentMetrics::sub(CurrentMetrics::KeeperAliveConnections); + } } void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSessions & requests_for_sessions, Coordination::Error error) @@ -520,6 +538,7 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms) std::lock_guard lock(push_request_mutex); if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms)) throw Exception("Cannot push session id request to queue within session timeout", ErrorCodes::TIMEOUT_EXCEEDED); + CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets); } if (future.wait_for(std::chrono::milliseconds(session_timeout_ms)) != std::future_status::ready) diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 368b23f34d2..b0f790e5c1c 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -8,8 +8,21 @@ #include #include "Common/ZooKeeper/ZooKeeperCommon.h" #include +#include #include "Coordination/KeeperStorage.h" +namespace ProfileEvents +{ + extern const Event KeeperCommits; + extern const Event KeeperCommitsFailed; + extern const Event KeeperSnapshotCreations; + extern const Event KeeperSnapshotCreationsFailed; + extern const Event KeeperSnapshotApplys; + extern const Event KeeperSnapshotApplysFailed; + extern const Event KeeperReadSnapshot; + extern const Event KeeperSaveSnapshot; +} + namespace DB { @@ -219,7 +232,10 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms); response->session_id = session_id; if (!responses_queue.push(response_for_session)) + { + ProfileEvents::increment(ProfileEvents::KeeperCommitsFailed); throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", session_id); + } } } else @@ -229,10 +245,13 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n request_for_session.request, request_for_session.session_id, request_for_session.zxid); for (auto & response_for_session : responses_for_sessions) if (!responses_queue.push(response_for_session)) + { + ProfileEvents::increment(ProfileEvents::KeeperCommitsFailed); throw Exception( ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response_for_session.session_id); + } if (digest_enabled && request_for_session.digest) { @@ -240,6 +259,7 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n } } + ProfileEvents::increment(ProfileEvents::KeeperCommits); last_committed_idx = log_idx; return nullptr; } @@ -251,11 +271,14 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s) { /// save snapshot into memory std::lock_guard lock(snapshots_lock); if (s.get_last_log_idx() != latest_snapshot_meta->get_last_log_idx()) + { + ProfileEvents::increment(ProfileEvents::KeeperSnapshotApplysFailed); throw Exception( ErrorCodes::LOGICAL_ERROR, "Required to apply snapshot with last log index {}, but our last log index is {}", s.get_last_log_idx(), latest_snapshot_meta->get_last_log_idx()); + } latest_snapshot_ptr = latest_snapshot_buf; } @@ -268,6 +291,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s) cluster_config = snapshot_deserialization_result.cluster_config; } + ProfileEvents::increment(ProfileEvents::KeeperSnapshotApplys); last_committed_idx = s.get_last_log_idx(); return true; } @@ -332,6 +356,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res } latest_snapshot_path = path; latest_snapshot_meta = snapshot->snapshot_meta; + ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreations); LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), path); } @@ -347,6 +372,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res } catch (...) { + ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreationsFailed); LOG_TRACE(log, "Exception happened during snapshot"); tryLogCurrentException(log); ret = false; @@ -380,6 +406,7 @@ void KeeperStateMachine::save_logical_snp_obj( latest_snapshot_meta = cloned_meta; LOG_DEBUG(log, "Saved snapshot {} to path {}", s.get_last_log_idx(), result_path); obj_id++; + ProfileEvents::increment(ProfileEvents::KeeperSaveSnapshot); } catch (...) { @@ -441,6 +468,7 @@ int KeeperStateMachine::read_logical_snp_obj( return -1; } is_last_obj = true; + ProfileEvents::increment(ProfileEvents::KeeperReadSnapshot); return 1; } diff --git a/src/Interpreters/AsynchronousMetrics.cpp b/src/Interpreters/AsynchronousMetrics.cpp index 4ac5acfd60f..505f956fd5a 100644 --- a/src/Interpreters/AsynchronousMetrics.cpp +++ b/src/Interpreters/AsynchronousMetrics.cpp @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include #include #include @@ -11,6 +13,8 @@ #include #include #include +#include +#include #include #include #include @@ -1464,6 +1468,98 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti new_values[name] = server_metric.current_threads; } } +#if USE_NURAFT + { + try { + auto keeper_dispatcher = getContext()->getKeeperDispatcher(); + size_t is_leader = 0; + size_t is_follower = 0; + size_t is_observer = 0; + size_t is_standalone = 0; + size_t znode_count = 0; + size_t watch_count =0; + size_t ephemerals_count = 0; + size_t approximate_data_size =0; + size_t key_arena_size = 0; + size_t latest_snapshot_size =0; + size_t open_file_descriptor_count =0; + size_t max_file_descriptor_count =0; + size_t followers =0; + size_t synced_followers = 0; + size_t zxid = 0; + size_t session_with_watches = 0; + size_t paths_watched = 0; + size_t snapshot_dir_size = 0; + size_t log_dir_size = 0; + + if (keeper_dispatcher && keeper_dispatcher->isServerActive()) + { + auto keeper_4LW_info = keeper_dispatcher -> getKeeper4LWInfo(); + if (keeper_4LW_info.is_standalone) + is_standalone = 1; + if (keeper_4LW_info.is_leader) + is_leader = 1; + if (keeper_4LW_info.is_observer) + is_observer = 1; + if (keeper_4LW_info.is_follower) + is_follower = 1; + + zxid = keeper_4LW_info.last_zxid; + const auto & state_machine = keeper_dispatcher->getStateMachine(); + znode_count = state_machine.getNodesCount(); + watch_count = state_machine.getTotalWatchesCount(); + ephemerals_count = state_machine.getTotalEphemeralNodesCount(); + approximate_data_size =state_machine.getApproximateDataSize(); + key_arena_size=state_machine.getKeyArenaSize(); + latest_snapshot_size=state_machine.getLatestSnapshotBufSize(); + session_with_watches = state_machine.getSessionsWithWatchesCount(); + paths_watched = state_machine.getWatchedPathsCount(); + snapshot_dir_size = keeper_dispatcher->getSnapDirSize(); + log_dir_size = keeper_dispatcher->getLogDirSize(); + + #if defined(__linux__) || defined(__APPLE__) + open_file_descriptor_count = getCurrentProcessFDCount(); + max_file_descriptor_count = getMaxFileDescriptorCount(); + #endif + + if (keeper_4LW_info.is_leader) + { + followers = keeper_4LW_info.follower_count; + synced_followers = keeper_4LW_info.synced_follower_count; + } + } + + new_values["KeeperIsLeader"] = is_leader; + new_values["KeeperIsFollower"] = is_follower; + new_values["KeeperIsObserver"] = is_observer; + new_values["KeeperIsStandalone"] = is_standalone; + + new_values["KeeperZnodeCount"] = znode_count; + new_values["KeeperWatchCount"] = watch_count; + new_values["KeeperEphemeralsCount"] = ephemerals_count; + + new_values["KeeperApproximateDataSize"] = approximate_data_size; + new_values["KeeperKeyArenaSize"] = key_arena_size; + + new_values["KeeperLatestSnapshotSize"] = latest_snapshot_size; + + new_values["KeeperOpenFileDescriptorCount"] = open_file_descriptor_count; + new_values["KeeperMaxFileDescriptorCount"] = max_file_descriptor_count; + + new_values["KeeperFollowers"] = followers; + new_values["KeeperSyncedFollowers"] = synced_followers; + new_values["KeeperZxid"] = zxid; + new_values["KeeperSessionWithWatches"] = session_with_watches; + new_values["KeeperPathsWatched"] = paths_watched; + new_values["KeeperSnapshotDirSize"] = snapshot_dir_size; + new_values["KeeperLogDirSize"] = log_dir_size; + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } +#endif #if USE_JEMALLOC // 'epoch' is a special mallctl -- it updates the statistics. Without it, all