mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Merge pull request #38072 from lingpeng0314/master
Add Keeper related monitoring data
This commit is contained in:
commit
37f799550b
@ -49,6 +49,12 @@ std::shared_ptr<KeeperDispatcher> TinyContext::getKeeperDispatcher() const
|
||||
return keeper_dispatcher;
|
||||
}
|
||||
|
||||
std::shared_ptr<KeeperDispatcher> TinyContext::tryGetKeeperDispatcher() const
|
||||
{
|
||||
std::lock_guard lock(keeper_dispatcher_mutex);
|
||||
return keeper_dispatcher;
|
||||
}
|
||||
|
||||
void TinyContext::shutdownKeeperDispatcher() const
|
||||
{
|
||||
std::lock_guard lock(keeper_dispatcher_mutex);
|
||||
|
@ -14,6 +14,7 @@ class TinyContext: public std::enable_shared_from_this<TinyContext>
|
||||
{
|
||||
public:
|
||||
std::shared_ptr<KeeperDispatcher> getKeeperDispatcher() const;
|
||||
std::shared_ptr<KeeperDispatcher> tryGetKeeperDispatcher() const;
|
||||
void initializeKeeperDispatcher(bool start_async) const;
|
||||
void shutdownKeeperDispatcher() const;
|
||||
void updateKeeperConfiguration(const Poco::Util::AbstractConfiguration & config);
|
||||
|
@ -93,6 +93,8 @@
|
||||
M(CacheFileSegments, "Number of existing cache file segments") \
|
||||
M(CacheDetachedFileSegments, "Number of existing detached cache file segments") \
|
||||
M(S3Requests, "S3 requests") \
|
||||
M(KeeperAliveConnections, "Number of alive connections") \
|
||||
M(KeeperOutstandingRequets, "Number of outstanding requests") \
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
|
@ -344,7 +344,20 @@
|
||||
\
|
||||
M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \
|
||||
M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \
|
||||
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely")
|
||||
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely") \
|
||||
M(KeeperPacketsSent, "Packets sent by keeper server") \
|
||||
M(KeeperPacketsReceived, "Packets received by keeper server") \
|
||||
M(KeeperRequestTotal, "Total requests number on keeper server") \
|
||||
M(KeeperLatency, "Keeper latency") \
|
||||
M(KeeperCommits, "Number of successful commits") \
|
||||
M(KeeperCommitsFailed, "Number of failed commits") \
|
||||
M(KeeperSnapshotCreations, "Number of snapshots creations")\
|
||||
M(KeeperSnapshotCreationsFailed, "Number of failed snapshot creations")\
|
||||
M(KeeperSnapshotApplys, "Number of snapshot applying")\
|
||||
M(KeeperSnapshotApplysFailed, "Number of failed snapshot applying")\
|
||||
M(KeeperReadSnapshot, "Number of snapshot read(serialization)")\
|
||||
M(KeeperSaveSnapshot, "Number of snapshot save")\
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
|
@ -1,5 +1,14 @@
|
||||
#include <atomic>
|
||||
#include <Coordination/KeeperConnectionStats.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event KeeperPacketsSent;
|
||||
extern const Event KeeperPacketsReceived;
|
||||
extern const Event KeeperRequestTotal;
|
||||
extern const Event KeeperLatency;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -40,18 +49,22 @@ uint64_t KeeperConnectionStats::getPacketsSent() const
|
||||
void KeeperConnectionStats::incrementPacketsReceived()
|
||||
{
|
||||
packets_received.fetch_add(1, std::memory_order_relaxed);
|
||||
ProfileEvents::increment(ProfileEvents::KeeperPacketsReceived, 1);
|
||||
}
|
||||
|
||||
void KeeperConnectionStats::incrementPacketsSent()
|
||||
{
|
||||
packets_sent.fetch_add(1, std::memory_order_relaxed);
|
||||
ProfileEvents::increment(ProfileEvents::KeeperPacketsSent, 1);
|
||||
}
|
||||
|
||||
void KeeperConnectionStats::updateLatency(uint64_t latency_ms)
|
||||
{
|
||||
last_latency.store(latency_ms, std::memory_order_relaxed);
|
||||
total_latency.fetch_add(latency_ms, std::memory_order_relaxed);
|
||||
ProfileEvents::increment(ProfileEvents::KeeperLatency, latency_ms);
|
||||
count.fetch_add(1, std::memory_order_relaxed);
|
||||
ProfileEvents::increment(ProfileEvents::KeeperRequestTotal, 1);
|
||||
|
||||
uint64_t prev_val = min_latency.load(std::memory_order_relaxed);
|
||||
while (prev_val > latency_ms && !min_latency.compare_exchange_weak(prev_val, latency_ms, std::memory_order_relaxed)) {}
|
||||
|
@ -7,6 +7,13 @@
|
||||
#include <Common/hex.h>
|
||||
#include <filesystem>
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric KeeperAliveConnections;
|
||||
extern const Metric KeeperOutstandingRequets;
|
||||
}
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
@ -57,6 +64,7 @@ void KeeperDispatcher::requestThread()
|
||||
{
|
||||
if (requests_queue->tryPop(request, max_wait))
|
||||
{
|
||||
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
|
||||
if (shutdown_called)
|
||||
break;
|
||||
|
||||
@ -78,6 +86,7 @@ void KeeperDispatcher::requestThread()
|
||||
/// Trying to get batch requests as fast as possible
|
||||
if (requests_queue->tryPop(request, 1))
|
||||
{
|
||||
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
|
||||
/// Don't append read request into batch, we have to process them separately
|
||||
if (!coordination_settings->quorum_reads && request.request->isReadRequest())
|
||||
{
|
||||
@ -226,6 +235,7 @@ void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKe
|
||||
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
|
||||
{
|
||||
session_to_response_callback.erase(session_response_callback);
|
||||
CurrentMetrics::sub(CurrentMetrics::KeeperAliveConnections);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -260,6 +270,7 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
|
||||
{
|
||||
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||
}
|
||||
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -351,6 +362,7 @@ void KeeperDispatcher::shutdown()
|
||||
/// Set session expired for all pending requests
|
||||
while (requests_queue && requests_queue->tryPop(request_for_session))
|
||||
{
|
||||
CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets);
|
||||
auto response = request_for_session.request->makeResponse();
|
||||
response->error = Coordination::Error::ZSESSIONEXPIRED;
|
||||
setResponse(request_for_session.session_id, response);
|
||||
@ -359,6 +371,7 @@ void KeeperDispatcher::shutdown()
|
||||
/// Clear all registered sessions
|
||||
std::lock_guard lock(session_to_response_callback_mutex);
|
||||
session_to_response_callback.clear();
|
||||
CurrentMetrics::set(CurrentMetrics::KeeperAliveConnections, 0);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -383,6 +396,7 @@ void KeeperDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCall
|
||||
std::lock_guard lock(session_to_response_callback_mutex);
|
||||
if (!session_to_response_callback.try_emplace(session_id, callback).second)
|
||||
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id);
|
||||
CurrentMetrics::add(CurrentMetrics::KeeperAliveConnections);
|
||||
}
|
||||
|
||||
void KeeperDispatcher::sessionCleanerTask()
|
||||
@ -415,6 +429,7 @@ void KeeperDispatcher::sessionCleanerTask()
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
if (!requests_queue->push(std::move(request_info)))
|
||||
LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions");
|
||||
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
||||
}
|
||||
|
||||
/// Remove session from registered sessions
|
||||
@ -438,7 +453,10 @@ void KeeperDispatcher::finishSession(int64_t session_id)
|
||||
std::lock_guard lock(session_to_response_callback_mutex);
|
||||
auto session_it = session_to_response_callback.find(session_id);
|
||||
if (session_it != session_to_response_callback.end())
|
||||
{
|
||||
session_to_response_callback.erase(session_it);
|
||||
CurrentMetrics::sub(CurrentMetrics::KeeperAliveConnections);
|
||||
}
|
||||
}
|
||||
|
||||
void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSessions & requests_for_sessions, Coordination::Error error)
|
||||
@ -521,6 +539,7 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
|
||||
throw Exception("Cannot push session id request to queue within session timeout", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||
CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets);
|
||||
}
|
||||
|
||||
if (future.wait_for(std::chrono::milliseconds(session_timeout_ms)) != std::future_status::ready)
|
||||
|
@ -8,8 +8,21 @@
|
||||
#include <sys/mman.h>
|
||||
#include "Common/ZooKeeper/ZooKeeperCommon.h"
|
||||
#include <Common/ZooKeeper/ZooKeeperIO.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include "Coordination/KeeperStorage.h"
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event KeeperCommits;
|
||||
extern const Event KeeperCommitsFailed;
|
||||
extern const Event KeeperSnapshotCreations;
|
||||
extern const Event KeeperSnapshotCreationsFailed;
|
||||
extern const Event KeeperSnapshotApplys;
|
||||
extern const Event KeeperSnapshotApplysFailed;
|
||||
extern const Event KeeperReadSnapshot;
|
||||
extern const Event KeeperSaveSnapshot;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -219,7 +232,10 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
|
||||
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
|
||||
response->session_id = session_id;
|
||||
if (!responses_queue.push(response_for_session))
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::KeeperCommitsFailed);
|
||||
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", session_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -229,10 +245,13 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
|
||||
request_for_session.request, request_for_session.session_id, request_for_session.zxid);
|
||||
for (auto & response_for_session : responses_for_sessions)
|
||||
if (!responses_queue.push(response_for_session))
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::KeeperCommitsFailed);
|
||||
throw Exception(
|
||||
ErrorCodes::SYSTEM_ERROR,
|
||||
"Could not push response with session id {} into responses queue",
|
||||
response_for_session.session_id);
|
||||
}
|
||||
|
||||
if (digest_enabled && request_for_session.digest)
|
||||
{
|
||||
@ -240,6 +259,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
|
||||
}
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::KeeperCommits);
|
||||
last_committed_idx = log_idx;
|
||||
return nullptr;
|
||||
}
|
||||
@ -251,11 +271,14 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
|
||||
{ /// save snapshot into memory
|
||||
std::lock_guard lock(snapshots_lock);
|
||||
if (s.get_last_log_idx() != latest_snapshot_meta->get_last_log_idx())
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::KeeperSnapshotApplysFailed);
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Required to apply snapshot with last log index {}, but our last log index is {}",
|
||||
s.get_last_log_idx(),
|
||||
latest_snapshot_meta->get_last_log_idx());
|
||||
}
|
||||
latest_snapshot_ptr = latest_snapshot_buf;
|
||||
}
|
||||
|
||||
@ -268,6 +291,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
|
||||
cluster_config = snapshot_deserialization_result.cluster_config;
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::KeeperSnapshotApplys);
|
||||
last_committed_idx = s.get_last_log_idx();
|
||||
return true;
|
||||
}
|
||||
@ -335,6 +359,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
|
||||
}
|
||||
latest_snapshot_path = path;
|
||||
latest_snapshot_meta = snapshot->snapshot_meta;
|
||||
ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreations);
|
||||
LOG_DEBUG(log, "Created persistent snapshot {} with path {}", latest_snapshot_meta->get_last_log_idx(), path);
|
||||
}
|
||||
|
||||
@ -350,6 +375,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::KeeperSnapshotCreationsFailed);
|
||||
LOG_TRACE(log, "Exception happened during snapshot");
|
||||
tryLogCurrentException(log);
|
||||
ret = false;
|
||||
@ -383,6 +409,7 @@ void KeeperStateMachine::save_logical_snp_obj(
|
||||
latest_snapshot_meta = cloned_meta;
|
||||
LOG_DEBUG(log, "Saved snapshot {} to path {}", s.get_last_log_idx(), result_path);
|
||||
obj_id++;
|
||||
ProfileEvents::increment(ProfileEvents::KeeperSaveSnapshot);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -444,6 +471,7 @@ int KeeperStateMachine::read_logical_snp_obj(
|
||||
return -1;
|
||||
}
|
||||
is_last_obj = true;
|
||||
ProfileEvents::increment(ProfileEvents::KeeperReadSnapshot);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
@ -4,6 +4,8 @@
|
||||
#include <Interpreters/JIT/CompiledExpressionCache.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Coordination/Keeper4LWInfo.h>
|
||||
#include <Coordination/KeeperDispatcher.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
@ -11,6 +13,8 @@
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/FileCacheFactory.h>
|
||||
#include <Common/IFileCache.h>
|
||||
#include <Common/getCurrentProcessFDCount.h>
|
||||
#include <Common/getMaxFileDescriptorCount.h>
|
||||
#include <Server/ProtocolServerAdapter.h>
|
||||
#include <Storages/MarkCache.h>
|
||||
#include <Storages/StorageMergeTree.h>
|
||||
@ -1464,6 +1468,90 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
|
||||
new_values[name] = server_metric.current_threads;
|
||||
}
|
||||
}
|
||||
#if USE_NURAFT
|
||||
{
|
||||
auto keeper_dispatcher = getContext()->tryGetKeeperDispatcher();
|
||||
if (keeper_dispatcher)
|
||||
{
|
||||
size_t is_leader = 0;
|
||||
size_t is_follower = 0;
|
||||
size_t is_observer = 0;
|
||||
size_t is_standalone = 0;
|
||||
size_t znode_count = 0;
|
||||
size_t watch_count =0;
|
||||
size_t ephemerals_count = 0;
|
||||
size_t approximate_data_size =0;
|
||||
size_t key_arena_size = 0;
|
||||
size_t latest_snapshot_size =0;
|
||||
size_t open_file_descriptor_count =0;
|
||||
size_t max_file_descriptor_count =0;
|
||||
size_t followers =0;
|
||||
size_t synced_followers = 0;
|
||||
size_t zxid = 0;
|
||||
size_t session_with_watches = 0;
|
||||
size_t paths_watched = 0;
|
||||
size_t snapshot_dir_size = 0;
|
||||
size_t log_dir_size = 0;
|
||||
|
||||
if (keeper_dispatcher->isServerActive())
|
||||
{
|
||||
auto keeper_info = keeper_dispatcher -> getKeeper4LWInfo();
|
||||
is_standalone = static_cast<size_t>(keeper_info.is_standalone);
|
||||
is_leader = static_cast<size_t>(keeper_info.is_leader);
|
||||
is_observer = static_cast<size_t>(keeper_info.is_observer);
|
||||
is_follower = static_cast<size_t>(keeper_info.is_follower);
|
||||
|
||||
zxid = keeper_info.last_zxid;
|
||||
const auto & state_machine = keeper_dispatcher->getStateMachine();
|
||||
znode_count = state_machine.getNodesCount();
|
||||
watch_count = state_machine.getTotalWatchesCount();
|
||||
ephemerals_count = state_machine.getTotalEphemeralNodesCount();
|
||||
approximate_data_size = state_machine.getApproximateDataSize();
|
||||
key_arena_size = state_machine.getKeyArenaSize();
|
||||
latest_snapshot_size = state_machine.getLatestSnapshotBufSize();
|
||||
session_with_watches = state_machine.getSessionsWithWatchesCount();
|
||||
paths_watched = state_machine.getWatchedPathsCount();
|
||||
snapshot_dir_size = keeper_dispatcher->getSnapDirSize();
|
||||
log_dir_size = keeper_dispatcher->getLogDirSize();
|
||||
|
||||
#if defined(__linux__) || defined(__APPLE__)
|
||||
open_file_descriptor_count = getCurrentProcessFDCount();
|
||||
max_file_descriptor_count = getMaxFileDescriptorCount();
|
||||
#endif
|
||||
|
||||
if (keeper_info.is_leader)
|
||||
{
|
||||
followers = keeper_info.follower_count;
|
||||
synced_followers = keeper_info.synced_follower_count;
|
||||
}
|
||||
}
|
||||
|
||||
new_values["KeeperIsLeader"] = is_leader;
|
||||
new_values["KeeperIsFollower"] = is_follower;
|
||||
new_values["KeeperIsObserver"] = is_observer;
|
||||
new_values["KeeperIsStandalone"] = is_standalone;
|
||||
|
||||
new_values["KeeperZnodeCount"] = znode_count;
|
||||
new_values["KeeperWatchCount"] = watch_count;
|
||||
new_values["KeeperEphemeralsCount"] = ephemerals_count;
|
||||
|
||||
new_values["KeeperApproximateDataSize"] = approximate_data_size;
|
||||
new_values["KeeperKeyArenaSize"] = key_arena_size;
|
||||
new_values["KeeperLatestSnapshotSize"] = latest_snapshot_size;
|
||||
|
||||
new_values["KeeperOpenFileDescriptorCount"] = open_file_descriptor_count;
|
||||
new_values["KeeperMaxFileDescriptorCount"] = max_file_descriptor_count;
|
||||
|
||||
new_values["KeeperFollowers"] = followers;
|
||||
new_values["KeeperSyncedFollowers"] = synced_followers;
|
||||
new_values["KeeperZxid"] = zxid;
|
||||
new_values["KeeperSessionWithWatches"] = session_with_watches;
|
||||
new_values["KeeperPathsWatched"] = paths_watched;
|
||||
new_values["KeeperSnapshotDirSize"] = snapshot_dir_size;
|
||||
new_values["KeeperLogDirSize"] = log_dir_size;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
#if USE_JEMALLOC
|
||||
// 'epoch' is a special mallctl -- it updates the statistics. Without it, all
|
||||
|
@ -2101,6 +2101,12 @@ std::shared_ptr<KeeperDispatcher> & Context::getKeeperDispatcher() const
|
||||
|
||||
return shared->keeper_dispatcher;
|
||||
}
|
||||
|
||||
std::shared_ptr<KeeperDispatcher> & Context::tryGetKeeperDispatcher() const
|
||||
{
|
||||
std::lock_guard lock(shared->keeper_dispatcher_mutex);
|
||||
return shared->keeper_dispatcher;
|
||||
}
|
||||
#endif
|
||||
|
||||
void Context::shutdownKeeperDispatcher() const
|
||||
|
@ -757,6 +757,7 @@ public:
|
||||
|
||||
#if USE_NURAFT
|
||||
std::shared_ptr<KeeperDispatcher> & getKeeperDispatcher() const;
|
||||
std::shared_ptr<KeeperDispatcher> & tryGetKeeperDispatcher() const;
|
||||
#endif
|
||||
void initializeKeeperDispatcher(bool start_async) const;
|
||||
void shutdownKeeperDispatcher() const;
|
||||
|
Loading…
Reference in New Issue
Block a user