From 1a952591821a9f6969a09875c5403e8ac38e7d2a Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Mon, 1 Jul 2024 15:17:41 +0200 Subject: [PATCH 01/10] Add extra profiling helpers for Keeper --- src/Common/ProfileEvents.cpp | 7 ++ src/Common/ZooKeeper/ZooKeeperCommon.cpp | 36 ++++++----- src/Common/ZooKeeper/ZooKeeperCommon.h | 32 ++++----- src/Coordination/CoordinationSettings.h | 5 +- src/Coordination/KeeperConstants.cpp | 7 ++ src/Coordination/KeeperDispatcher.cpp | 17 ++++- src/Coordination/KeeperStateMachine.cpp | 82 +++++++++++++++--------- src/Coordination/KeeperStateMachine.h | 7 +- src/Coordination/KeeperStorage.cpp | 30 +++++++++ src/Coordination/SnapshotableHashTable.h | 1 + src/Server/KeeperTCPHandler.cpp | 19 ++++-- 11 files changed, 168 insertions(+), 75 deletions(-) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index d98373b6c55..a1058a879bd 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -611,6 +611,13 @@ The server successfully detected this situation and will download merged part fr M(KeeperPacketsReceived, "Packets received by keeper server") \ M(KeeperRequestTotal, "Total requests number on keeper server") \ M(KeeperLatency, "Keeper latency") \ + M(KeeperTotalElapsedMicroseconds, "Keeper total latency for a single request") \ + M(KeeperProcessElapsedMicroseconds, "Keeper commit latency for a single request") \ + M(KeeperPreprocessElapsedMicroseconds, "Keeper preprocessing latency for a single reuquest") \ + M(KeeperStorageLockWaitMicroseconds, "Time spent waiting for acquiring Keeper storage lock") \ + M(KeeperCommitWaitElapsedMicroseconds, "Time spent waiting for certain log to be committed") \ + M(KeeperBatchMaxCount, "Number of times the size of batch was limited by the amount") \ + M(KeeperBatchMaxTotalSize, "Number of times the size of batch was limited by the total bytes size") \ M(KeeperCommits, "Number of successful commits") \ M(KeeperCommitsFailed, "Number of failed commits") \ M(KeeperSnapshotCreations, "Number of snapshots creations")\ diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.cpp b/src/Common/ZooKeeper/ZooKeeperCommon.cpp index 48bb510e589..dff14f74681 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/Common/ZooKeeper/ZooKeeperCommon.cpp @@ -9,7 +9,6 @@ #include #include #include -#include namespace Coordination @@ -29,7 +28,7 @@ void ZooKeeperResponse::write(WriteBuffer & out) const Coordination::write(buf.str(), out); } -std::string ZooKeeperRequest::toString() const +std::string ZooKeeperRequest::toString(bool short_format) const { return fmt::format( "XID = {}\n" @@ -37,7 +36,7 @@ std::string ZooKeeperRequest::toString() const "Additional info:\n{}", xid, getOpNum(), - toStringImpl()); + toStringImpl(short_format)); } void ZooKeeperRequest::write(WriteBuffer & out) const @@ -60,7 +59,7 @@ void ZooKeeperSyncRequest::readImpl(ReadBuffer & in) Coordination::read(path, in); } -std::string ZooKeeperSyncRequest::toStringImpl() const +std::string ZooKeeperSyncRequest::toStringImpl(bool /*short_format*/) const { return fmt::format("path = {}", path); } @@ -91,7 +90,7 @@ void ZooKeeperReconfigRequest::readImpl(ReadBuffer & in) Coordination::read(version, in); } -std::string ZooKeeperReconfigRequest::toStringImpl() const +std::string ZooKeeperReconfigRequest::toStringImpl(bool /*short_format*/) const { return fmt::format( "joining = {}\nleaving = {}\nnew_members = {}\nversion = {}", @@ -145,7 +144,7 @@ void ZooKeeperAuthRequest::readImpl(ReadBuffer & in) Coordination::read(data, in); } -std::string ZooKeeperAuthRequest::toStringImpl() const +std::string ZooKeeperAuthRequest::toStringImpl(bool /*short_format*/) const { return fmt::format( "type = {}\n" @@ -191,7 +190,7 @@ void ZooKeeperCreateRequest::readImpl(ReadBuffer & in) is_sequential = true; } -std::string ZooKeeperCreateRequest::toStringImpl() const +std::string ZooKeeperCreateRequest::toStringImpl(bool /*short_format*/) const { return fmt::format( "path = {}\n" @@ -218,7 +217,7 @@ void ZooKeeperRemoveRequest::writeImpl(WriteBuffer & out) const Coordination::write(version, out); } -std::string ZooKeeperRemoveRequest::toStringImpl() const +std::string ZooKeeperRemoveRequest::toStringImpl(bool /*short_format*/) const { return fmt::format( "path = {}\n" @@ -245,7 +244,7 @@ void ZooKeeperExistsRequest::readImpl(ReadBuffer & in) Coordination::read(has_watch, in); } -std::string ZooKeeperExistsRequest::toStringImpl() const +std::string ZooKeeperExistsRequest::toStringImpl(bool /*short_format*/) const { return fmt::format("path = {}", path); } @@ -272,7 +271,7 @@ void ZooKeeperGetRequest::readImpl(ReadBuffer & in) Coordination::read(has_watch, in); } -std::string ZooKeeperGetRequest::toStringImpl() const +std::string ZooKeeperGetRequest::toStringImpl(bool /*short_format*/) const { return fmt::format("path = {}", path); } @@ -303,7 +302,7 @@ void ZooKeeperSetRequest::readImpl(ReadBuffer & in) Coordination::read(version, in); } -std::string ZooKeeperSetRequest::toStringImpl() const +std::string ZooKeeperSetRequest::toStringImpl(bool /*short_format*/) const { return fmt::format( "path = {}\n" @@ -334,7 +333,7 @@ void ZooKeeperListRequest::readImpl(ReadBuffer & in) Coordination::read(has_watch, in); } -std::string ZooKeeperListRequest::toStringImpl() const +std::string ZooKeeperListRequest::toStringImpl(bool /*short_format*/) const { return fmt::format("path = {}", path); } @@ -356,7 +355,7 @@ void ZooKeeperFilteredListRequest::readImpl(ReadBuffer & in) list_request_type = static_cast(read_request_type); } -std::string ZooKeeperFilteredListRequest::toStringImpl() const +std::string ZooKeeperFilteredListRequest::toStringImpl(bool /*short_format*/) const { return fmt::format( "path = {}\n" @@ -401,7 +400,7 @@ void ZooKeeperSetACLRequest::readImpl(ReadBuffer & in) Coordination::read(version, in); } -std::string ZooKeeperSetACLRequest::toStringImpl() const +std::string ZooKeeperSetACLRequest::toStringImpl(bool /*short_format*/) const { return fmt::format("path = {}\nversion = {}", path, version); } @@ -426,7 +425,7 @@ void ZooKeeperGetACLRequest::writeImpl(WriteBuffer & out) const Coordination::write(path, out); } -std::string ZooKeeperGetACLRequest::toStringImpl() const +std::string ZooKeeperGetACLRequest::toStringImpl(bool /*short_format*/) const { return fmt::format("path = {}", path); } @@ -455,7 +454,7 @@ void ZooKeeperCheckRequest::readImpl(ReadBuffer & in) Coordination::read(version, in); } -std::string ZooKeeperCheckRequest::toStringImpl() const +std::string ZooKeeperCheckRequest::toStringImpl(bool /*short_format*/) const { return fmt::format("path = {}\nversion = {}", path, version); } @@ -600,8 +599,11 @@ void ZooKeeperMultiRequest::readImpl(ReadBuffer & in) } } -std::string ZooKeeperMultiRequest::toStringImpl() const +std::string ZooKeeperMultiRequest::toStringImpl(bool short_format) const { + if (short_format) + return fmt::format("Subrequests size = {}", requests.size()); + auto out = fmt::memory_buffer(); for (const auto & request : requests) { diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h index 490c2dce4f8..fd6ec3cd375 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.h +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -63,12 +63,12 @@ struct ZooKeeperRequest : virtual Request /// Writes length, xid, op_num, then the rest. void write(WriteBuffer & out) const; - std::string toString() const; + std::string toString(bool short_format = false) const; virtual void writeImpl(WriteBuffer &) const = 0; virtual void readImpl(ReadBuffer &) = 0; - virtual std::string toStringImpl() const { return ""; } + virtual std::string toStringImpl(bool /*short_format*/) const { return ""; } static std::shared_ptr read(ReadBuffer & in); @@ -98,7 +98,7 @@ struct ZooKeeperSyncRequest final : ZooKeeperRequest OpNum getOpNum() const override { return OpNum::Sync; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; - std::string toStringImpl() const override; + std::string toStringImpl(bool short_format) const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return false; } @@ -123,7 +123,7 @@ struct ZooKeeperReconfigRequest final : ZooKeeperRequest OpNum getOpNum() const override { return OpNum::Reconfig; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; - std::string toStringImpl() const override; + std::string toStringImpl(bool short_format) const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return false; } @@ -176,7 +176,7 @@ struct ZooKeeperAuthRequest final : ZooKeeperRequest OpNum getOpNum() const override { return OpNum::Auth; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; - std::string toStringImpl() const override; + std::string toStringImpl(bool short_format) const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return false; } @@ -229,7 +229,7 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest OpNum getOpNum() const override { return not_exists ? OpNum::CreateIfNotExists : OpNum::Create; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; - std::string toStringImpl() const override; + std::string toStringImpl(bool short_format) const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return false; } @@ -266,7 +266,7 @@ struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest OpNum getOpNum() const override { return OpNum::Remove; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; - std::string toStringImpl() const override; + std::string toStringImpl(bool short_format) const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return false; } @@ -293,7 +293,7 @@ struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest OpNum getOpNum() const override { return OpNum::Exists; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; - std::string toStringImpl() const override; + std::string toStringImpl(bool short_format) const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return true; } @@ -320,7 +320,7 @@ struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest OpNum getOpNum() const override { return OpNum::Get; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; - std::string toStringImpl() const override; + std::string toStringImpl(bool short_format) const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return true; } @@ -347,7 +347,7 @@ struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest OpNum getOpNum() const override { return OpNum::Set; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; - std::string toStringImpl() const override; + std::string toStringImpl(bool short_format) const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return false; } @@ -375,7 +375,7 @@ struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest OpNum getOpNum() const override { return OpNum::List; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; - std::string toStringImpl() const override; + std::string toStringImpl(bool short_format) const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return true; } @@ -395,7 +395,7 @@ struct ZooKeeperFilteredListRequest final : ZooKeeperListRequest OpNum getOpNum() const override { return OpNum::FilteredList; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; - std::string toStringImpl() const override; + std::string toStringImpl(bool short_format) const override; size_t bytesSize() const override { return ZooKeeperListRequest::bytesSize() + sizeof(list_request_type); } }; @@ -428,7 +428,7 @@ struct ZooKeeperCheckRequest : CheckRequest, ZooKeeperRequest OpNum getOpNum() const override { return not_exists ? OpNum::CheckNotExists : OpNum::Check; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; - std::string toStringImpl() const override; + std::string toStringImpl(bool short_format) const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return true; } @@ -469,7 +469,7 @@ struct ZooKeeperSetACLRequest final : SetACLRequest, ZooKeeperRequest OpNum getOpNum() const override { return OpNum::SetACL; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; - std::string toStringImpl() const override; + std::string toStringImpl(bool short_format) const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return false; } @@ -490,7 +490,7 @@ struct ZooKeeperGetACLRequest final : GetACLRequest, ZooKeeperRequest OpNum getOpNum() const override { return OpNum::GetACL; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; - std::string toStringImpl() const override; + std::string toStringImpl(bool short_format) const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return true; } @@ -516,7 +516,7 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; - std::string toStringImpl() const override; + std::string toStringImpl(bool short_format) const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override; diff --git a/src/Coordination/CoordinationSettings.h b/src/Coordination/CoordinationSettings.h index a32552616ee..e7ae1f86d2e 100644 --- a/src/Coordination/CoordinationSettings.h +++ b/src/Coordination/CoordinationSettings.h @@ -58,7 +58,10 @@ struct Settings; M(UInt64, latest_logs_cache_size_threshold, 1 * 1024 * 1024 * 1024, "Maximum total size of in-memory cache of latest log entries.", 0) \ M(UInt64, commit_logs_cache_size_threshold, 500 * 1024 * 1024, "Maximum total size of in-memory cache of log entries needed next for commit.", 0) \ M(UInt64, disk_move_retries_wait_ms, 1000, "How long to wait between retries after a failure which happened while a file was being moved between disks.", 0) \ - M(UInt64, disk_move_retries_during_init, 100, "The amount of retries after a failure which happened while a file was being moved between disks during initialization.", 0) + M(UInt64, disk_move_retries_during_init, 100, "The amount of retries after a failure which happened while a file was being moved between disks during initialization.", 0) \ + M(UInt64, log_slow_total_threshold_ms, 5000, "Requests for which the total latency is larger than this settings will be logged", 0) \ + M(UInt64, log_slow_cpu_threshold_ms, 5000, "Requests for which the CPU (preprocessing and processing) latency is larger than this settings will be logged", 0) \ + M(UInt64, log_slow_connection_operation_threshold_ms, 1000, "Log message if a certain operation took too long inside a single connection", 0) DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS) diff --git a/src/Coordination/KeeperConstants.cpp b/src/Coordination/KeeperConstants.cpp index 51bf037c1c9..ff26b3171ea 100644 --- a/src/Coordination/KeeperConstants.cpp +++ b/src/Coordination/KeeperConstants.cpp @@ -238,6 +238,13 @@ M(KeeperPacketsReceived) \ M(KeeperRequestTotal) \ M(KeeperLatency) \ + M(KeeperTotalElapsedMicroseconds) \ + M(KeeperProcessElapsedMicroseconds) \ + M(KeeperPreprocessElapsedMicroseconds) \ + M(KeeperStorageLockWaitMicroseconds) \ + M(KeeperCommitWaitElapsedMicroseconds) \ + M(KeeperBatchMaxCount) \ + M(KeeperBatchMaxTotalSize) \ M(KeeperCommits) \ M(KeeperCommitsFailed) \ M(KeeperSnapshotCreations) \ diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index b4389da082d..925ac9a4efe 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -31,6 +31,13 @@ namespace CurrentMetrics extern const Metric KeeperOutstandingRequets; } +namespace ProfileEvents +{ + extern const Event KeeperCommitWaitElapsedMicroseconds; + extern const Event KeeperBatchMaxCount; + extern const Event KeeperBatchMaxTotalSize; +} + using namespace std::chrono_literals; namespace DB @@ -119,6 +126,7 @@ void KeeperDispatcher::requestThread() auto coordination_settings = configuration_and_settings->coordination_settings; uint64_t max_wait = coordination_settings->operation_timeout_ms.totalMilliseconds(); uint64_t max_batch_bytes_size = coordination_settings->max_requests_batch_bytes_size; + size_t max_batch_size = coordination_settings->max_requests_batch_size; /// The code below do a very simple thing: batch all write (quorum) requests into vector until /// previous write batch is not finished or max_batch size achieved. The main complexity goes from @@ -188,7 +196,6 @@ void KeeperDispatcher::requestThread() return false; }; - size_t max_batch_size = coordination_settings->max_requests_batch_size; while (!shutdown_called && current_batch.size() < max_batch_size && !has_reconfig_request && current_batch_bytes_size < max_batch_bytes_size && try_get_request()) ; @@ -225,6 +232,12 @@ void KeeperDispatcher::requestThread() /// Process collected write requests batch if (!current_batch.empty()) { + if (current_batch.size() == max_batch_size) + ProfileEvents::increment(ProfileEvents::KeeperBatchMaxCount, 1); + + if (current_batch_bytes_size == max_batch_bytes_size) + ProfileEvents::increment(ProfileEvents::KeeperBatchMaxTotalSize, 1); + LOG_TRACE(log, "Processing requests batch, size: {}, bytes: {}", current_batch.size(), current_batch_bytes_size); auto result = server->putRequestBatch(current_batch); @@ -243,6 +256,8 @@ void KeeperDispatcher::requestThread() /// If we will execute read or reconfig next, we have to process result now if (execute_requests_after_write) { + Stopwatch watch; + SCOPE_EXIT(ProfileEvents::increment(ProfileEvents::KeeperCommitWaitElapsedMicroseconds, watch.elapsedMicroseconds())); if (prev_result) result_buf = forceWaitAndProcessResult( prev_result, prev_batch, /*clear_requests_on_success=*/!execute_requests_after_write); diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index e4d661dfe17..a12d8a50ac3 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -1,12 +1,14 @@ #include #include +#include +#include +#include #include #include -#include #include -#include #include #include +#include #include #include #include @@ -17,7 +19,6 @@ #include #include #include -#include namespace ProfileEvents @@ -31,6 +32,7 @@ namespace ProfileEvents extern const Event KeeperSnapshotApplysFailed; extern const Event KeeperReadSnapshot; extern const Event KeeperSaveSnapshot; + extern const Event KeeperStorageLockWaitMicroseconds; } namespace DB @@ -153,6 +155,14 @@ void assertDigest( } +std::unique_lock KeeperStateMachine::getStorageLock() const +{ + Stopwatch watch; + std::unique_lock lock(storage_and_responses_lock); + ProfileEvents::increment(ProfileEvents::KeeperStorageLockWaitMicroseconds, watch.elapsedMicroseconds()); + return lock; +} + nuraft::ptr KeeperStateMachine::pre_commit(uint64_t log_idx, nuraft::buffer & data) { auto result = nuraft::buffer::alloc(sizeof(log_idx)); @@ -272,7 +282,7 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req if (op_num == Coordination::OpNum::SessionID || op_num == Coordination::OpNum::Reconfig) return true; - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); if (storage->isFinalized()) return false; @@ -302,7 +312,7 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req void KeeperStateMachine::reconfigure(const KeeperStorage::RequestForSession& request_for_session) { - std::lock_guard _(storage_and_responses_lock); + auto lock = getStorageLock(); KeeperStorage::ResponseForSession response = processReconfiguration(request_for_session); if (!responses_queue.push(response)) { @@ -391,7 +401,7 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n if (!keeper_context->localLogsPreprocessed() && !preprocess(*request_for_session)) return nullptr; - auto try_push = [this](const KeeperStorage::ResponseForSession& response) + auto try_push = [&](const KeeperStorage::ResponseForSession& response) { if (!responses_queue.push(response)) { @@ -400,6 +410,17 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n "Failed to push response with session id {} to the queue, probably because of shutdown", response.session_id); } + + using namespace std::chrono; + uint64_t elapsed = request_for_session->time - duration_cast(system_clock::now().time_since_epoch()).count(); + if (elapsed > keeper_context->getCoordinationSettings()->log_slow_total_threshold_ms) + { + LOG_INFO( + log, + "Total time to process a request took too long ({}ms).\nRequest info: {}", + elapsed, + request_for_session->request->toString(/*short_format=*/true)); + } }; try @@ -417,7 +438,7 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n response_for_session.session_id = -1; response_for_session.response = response; - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); session_id = storage->getSessionID(session_id_request.session_timeout_ms); LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms); response->session_id = session_id; @@ -426,12 +447,13 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n else { if (op_num == Coordination::OpNum::Close) + { std::lock_guard lock(request_cache_mutex); parsed_request_cache.erase(request_for_session->session_id); } - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid); for (auto & response_for_session : responses_for_sessions) @@ -482,7 +504,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s) } { /// deserialize and apply snapshot to storage - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); SnapshotDeserializationResult snapshot_deserialization_result; if (latest_snapshot_ptr) @@ -534,7 +556,7 @@ void KeeperStateMachine::rollbackRequest(const KeeperStorage::RequestForSession if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID) return; - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); storage->rollbackRequest(request_for_session.zxid, allow_missing); } @@ -561,7 +583,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf); CreateSnapshotTask snapshot_task; { /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking. - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); snapshot_task.snapshot = std::make_shared(storage.get(), snapshot_meta_copy, getClusterConfig()); } @@ -623,7 +645,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res } { /// Destroy snapshot with lock - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); LOG_TRACE(log, "Clearing garbage after snapshot"); /// Turn off "snapshot mode" and clear outdate part of storage state storage->clearGarbageAfterSnapshot(); @@ -764,7 +786,7 @@ int KeeperStateMachine::read_logical_snp_obj( void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session) { /// Pure local request, just process it with storage - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); auto responses = storage->processRequest( request_for_session.request, request_for_session.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/); for (const auto & response : responses) @@ -774,97 +796,97 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi void KeeperStateMachine::shutdownStorage() { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); storage->finalize(); } std::vector KeeperStateMachine::getDeadSessions() { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); return storage->getDeadSessions(); } int64_t KeeperStateMachine::getNextZxid() const { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); return storage->getNextZXID(); } KeeperStorage::Digest KeeperStateMachine::getNodesDigest() const { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); return storage->getNodesDigest(false); } uint64_t KeeperStateMachine::getLastProcessedZxid() const { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); return storage->getZXID(); } uint64_t KeeperStateMachine::getNodesCount() const { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); return storage->getNodesCount(); } uint64_t KeeperStateMachine::getTotalWatchesCount() const { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); return storage->getTotalWatchesCount(); } uint64_t KeeperStateMachine::getWatchedPathsCount() const { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); return storage->getWatchedPathsCount(); } uint64_t KeeperStateMachine::getSessionsWithWatchesCount() const { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); return storage->getSessionsWithWatchesCount(); } uint64_t KeeperStateMachine::getTotalEphemeralNodesCount() const { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); return storage->getTotalEphemeralNodesCount(); } uint64_t KeeperStateMachine::getSessionWithEphemeralNodesCount() const { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); return storage->getSessionWithEphemeralNodesCount(); } void KeeperStateMachine::dumpWatches(WriteBufferFromOwnString & buf) const { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); storage->dumpWatches(buf); } void KeeperStateMachine::dumpWatchesByPath(WriteBufferFromOwnString & buf) const { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); storage->dumpWatchesByPath(buf); } void KeeperStateMachine::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); storage->dumpSessionsAndEphemerals(buf); } uint64_t KeeperStateMachine::getApproximateDataSize() const { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); return storage->getApproximateDataSize(); } uint64_t KeeperStateMachine::getKeyArenaSize() const { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); return storage->getArenaDataSize(); } @@ -905,7 +927,7 @@ ClusterConfigPtr KeeperStateMachine::getClusterConfig() const void KeeperStateMachine::recalculateStorageStats() { - std::lock_guard lock(storage_and_responses_lock); + auto lock = getStorageLock(); LOG_INFO(log, "Recalculating storage stats"); storage->recalculateStats(); LOG_INFO(log, "Done recalculating storage stats"); diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index ee6109f0a17..5b166e11569 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -131,6 +131,8 @@ public: void reconfigure(const KeeperStorage::RequestForSession& request_for_session); private: + std::unique_lock getStorageLock() const; + CommitCallback commit_callback; /// In our state machine we always have a single snapshot which is stored /// in memory in compressed (serialized) format. @@ -139,7 +141,7 @@ private: nuraft::ptr latest_snapshot_buf = nullptr; /// Main state machine logic - KeeperStoragePtr storage TSA_PT_GUARDED_BY(storage_and_responses_lock); + KeeperStoragePtr storage; /// Save/Load and Serialize/Deserialize logic for snapshots. KeeperSnapshotManager snapshot_manager; @@ -183,7 +185,6 @@ private: KeeperSnapshotManagerS3 * snapshot_manager_s3; KeeperStorage::ResponseForSession processReconfiguration( - const KeeperStorage::RequestForSession& request_for_session) - TSA_REQUIRES(storage_and_responses_lock); + const KeeperStorage::RequestForSession& request_for_session); }; } diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index d6225baaf4c..1e53a664d1b 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -40,6 +40,8 @@ namespace ProfileEvents extern const Event KeeperGetRequest; extern const Event KeeperListRequest; extern const Event KeeperExistsRequest; + extern const Event KeeperPreprocessElapsedMicroseconds; + extern const Event KeeperProcessElapsedMicroseconds; } namespace DB @@ -2309,6 +2311,20 @@ void KeeperStorage::preprocessRequest( std::optional digest, int64_t log_idx) { + Stopwatch watch; + SCOPE_EXIT({ + auto elapsed = watch.elapsedMicroseconds(); + if (auto elapsed_ms = elapsed / 1000; elapsed_ms > keeper_context->getCoordinationSettings()->log_slow_cpu_threshold_ms) + { + LOG_INFO( + getLogger("KeeperStorage"), + "Preprocessing a request took too long ({}ms).\nRequest info: {}", + elapsed_ms, + zk_request->toString(/*short_format=*/true)); + } + ProfileEvents::increment(ProfileEvents::KeeperPreprocessElapsedMicroseconds, watch.elapsedMicroseconds()); + }); + if (!initialized) throw Exception(ErrorCodes::LOGICAL_ERROR, "KeeperStorage system nodes are not initialized"); @@ -2409,6 +2425,20 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( bool check_acl, bool is_local) { + Stopwatch watch; + SCOPE_EXIT({ + auto elapsed = watch.elapsedMicroseconds(); + if (auto elapsed_ms = elapsed / 1000; elapsed_ms > keeper_context->getCoordinationSettings()->log_slow_cpu_threshold_ms) + { + LOG_INFO( + getLogger("KeeperStorage"), + "Processing a request took too long ({}ms).\nRequest info: {}", + elapsed_ms, + zk_request->toString(/*short_format=*/true)); + } + ProfileEvents::increment(ProfileEvents::KeeperProcessElapsedMicroseconds, watch.elapsedMicroseconds()); + }); + if (!initialized) throw Exception(ErrorCodes::LOGICAL_ERROR, "KeeperStorage system nodes are not initialized"); diff --git a/src/Coordination/SnapshotableHashTable.h b/src/Coordination/SnapshotableHashTable.h index 70858930115..5f2b14e17b0 100644 --- a/src/Coordination/SnapshotableHashTable.h +++ b/src/Coordination/SnapshotableHashTable.h @@ -3,6 +3,7 @@ #include #include +#include namespace DB { diff --git a/src/Server/KeeperTCPHandler.cpp b/src/Server/KeeperTCPHandler.cpp index 4612e2e9fa8..47064b467e7 100644 --- a/src/Server/KeeperTCPHandler.cpp +++ b/src/Server/KeeperTCPHandler.cpp @@ -13,11 +13,9 @@ #include #include #include -#include #include #include #include -#include #include #include #include @@ -30,6 +28,11 @@ #include #endif +namespace ProfileEvents +{ + extern const Event KeeperTotalElapsedMicroseconds; +} + namespace DB { @@ -411,12 +414,12 @@ void KeeperTCPHandler::runImpl() keeper_dispatcher->registerSession(session_id, response_callback); Stopwatch logging_stopwatch; + auto operation_max_ms = keeper_dispatcher->getKeeperContext()->getCoordinationSettings()->log_slow_connection_operation_threshold_ms; auto log_long_operation = [&](const String & operation) { - constexpr UInt64 operation_max_ms = 500; auto elapsed_ms = logging_stopwatch.elapsedMilliseconds(); if (operation_max_ms < elapsed_ms) - LOG_TEST(log, "{} for session {} took {} ms", operation, session_id, elapsed_ms); + LOG_INFO(log, "{} for session {} took {} ms", operation, session_id, elapsed_ms); logging_stopwatch.restart(); }; @@ -611,11 +614,13 @@ void KeeperTCPHandler::updateStats(Coordination::ZooKeeperResponsePtr & response /// update statistics ignoring watch response and heartbeat. if (response->xid != Coordination::WATCH_XID && response->getOpNum() != Coordination::OpNum::Heartbeat) { - Int64 elapsed = (Poco::Timestamp() - operations[response->xid]) / 1000; - conn_stats.updateLatency(elapsed); + Int64 elapsed = (Poco::Timestamp() - operations[response->xid]); + ProfileEvents::increment(ProfileEvents::KeeperTotalElapsedMicroseconds, elapsed); + Int64 elapsed_ms = elapsed / 1000; + conn_stats.updateLatency(elapsed_ms); operations.erase(response->xid); - keeper_dispatcher->updateKeeperStatLatency(elapsed); + keeper_dispatcher->updateKeeperStatLatency(elapsed_ms); last_op.set(std::make_unique(LastOp{ .name = Coordination::toString(response->getOpNum()), From 7361407ea50e6cc22ced76527634b3404a99642c Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Mon, 1 Jul 2024 15:25:00 +0200 Subject: [PATCH 02/10] Fix test --- src/Coordination/CoordinationSettings.cpp | 17 +++++++++++++++++ .../test_keeper_four_word_command/test.py | 10 ++++++++++ 2 files changed, 27 insertions(+) diff --git a/src/Coordination/CoordinationSettings.cpp b/src/Coordination/CoordinationSettings.cpp index 05f691ca76b..d72d39fd7e1 100644 --- a/src/Coordination/CoordinationSettings.cpp +++ b/src/Coordination/CoordinationSettings.cpp @@ -169,6 +169,23 @@ void KeeperConfigurationAndSettings::dump(WriteBufferFromOwnString & buf) const writeText("async_replication=", buf); write_bool(coordination_settings->async_replication); + + writeText("latest_logs_cache_size_threshold=", buf); + write_int(coordination_settings->latest_logs_cache_size_threshold); + writeText("commit_logs_cache_size_threshold=", buf); + write_int(coordination_settings->commit_logs_cache_size_threshold); + + writeText("disk_move_retries_wait_ms=", buf); + write_int(coordination_settings->disk_move_retries_wait_ms); + writeText("disk_move_retries_during_init=", buf); + write_int(coordination_settings->disk_move_retries_during_init); + + writeText("log_slow_total_threshold_ms=", buf); + write_int(coordination_settings->log_slow_total_threshold_ms); + writeText("log_slow_cpu_threshold_ms=", buf); + write_int(coordination_settings->log_slow_cpu_threshold_ms); + writeText("log_slow_connection_operation_threshold_ms=", buf); + write_int(coordination_settings->log_slow_connection_operation_threshold_ms); } KeeperConfigurationAndSettingsPtr diff --git a/tests/integration/test_keeper_four_word_command/test.py b/tests/integration/test_keeper_four_word_command/test.py index 44b2b50673a..a3a059c1dcb 100644 --- a/tests/integration/test_keeper_four_word_command/test.py +++ b/tests/integration/test_keeper_four_word_command/test.py @@ -293,6 +293,16 @@ def test_cmd_conf(started_cluster): assert result["configuration_change_tries_count"] == "20" assert result["async_replication"] == "true" + + assert result["latest_logs_cache_size_threshold"] == "1073741824" + assert result["commit_logs_cache_size_threshold"] == "524288000" + + assert result["disk_move_retries_wait_ms"] == "1000" + assert result["disk_move_retries_during_init"] == "100" + + assert result["log_slow_total_threshold_ms"] == "5000" + assert result["log_slow_cpu_threshold_ms"] == "5000" + assert result["log_slow_connection_operation_threshold_ms"] == "1000" finally: close_keeper_socket(client) From 447c0db2bc1db2706598a5fe0eb81332b3e4f0ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Mon, 1 Jul 2024 20:11:19 +0200 Subject: [PATCH 03/10] Fix SettingsChangesHistory 24.7 --- src/Core/SettingsChangesHistory.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 194a0024f2b..65efc157741 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -58,6 +58,7 @@ String ClickHouseVersion::toString() const static std::initializer_list> settings_changes_history_initializer = { {"24.7", {{"output_format_parquet_write_page_index", false, true, "Add a possibility to write page index into parquet files."}, + {"input_format_json_ignore_key_case", false, false, "Ignore json key case while read json field from string."}, }}, {"24.6", {{"materialize_skip_indexes_on_insert", true, true, "Added new setting to allow to disable materialization of skip indexes on insert"}, {"materialize_statistics_on_insert", true, true, "Added new setting to allow to disable materialization of statistics on insert"}, @@ -88,7 +89,6 @@ static std::initializer_list Date: Tue, 2 Jul 2024 10:50:21 +0200 Subject: [PATCH 04/10] Fixes --- src/Coordination/CoordinationSettings.h | 2 +- src/Coordination/KeeperStateMachine.cpp | 74 +++++++++++++------------ src/Coordination/KeeperStateMachine.h | 8 +-- src/Coordination/KeeperStorage.cpp | 4 +- 4 files changed, 46 insertions(+), 42 deletions(-) diff --git a/src/Coordination/CoordinationSettings.h b/src/Coordination/CoordinationSettings.h index e7ae1f86d2e..6e23a56ef97 100644 --- a/src/Coordination/CoordinationSettings.h +++ b/src/Coordination/CoordinationSettings.h @@ -60,7 +60,7 @@ struct Settings; M(UInt64, disk_move_retries_wait_ms, 1000, "How long to wait between retries after a failure which happened while a file was being moved between disks.", 0) \ M(UInt64, disk_move_retries_during_init, 100, "The amount of retries after a failure which happened while a file was being moved between disks during initialization.", 0) \ M(UInt64, log_slow_total_threshold_ms, 5000, "Requests for which the total latency is larger than this settings will be logged", 0) \ - M(UInt64, log_slow_cpu_threshold_ms, 5000, "Requests for which the CPU (preprocessing and processing) latency is larger than this settings will be logged", 0) \ + M(UInt64, log_slow_cpu_threshold_ms, 100, "Requests for which the CPU (preprocessing and processing) latency is larger than this settings will be logged", 0) \ M(UInt64, log_slow_connection_operation_threshold_ms, 1000, "Log message if a certain operation took too long inside a single connection", 0) DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS) diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index a12d8a50ac3..88f708ab4ae 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -153,14 +153,20 @@ void assertDigest( } } -} - -std::unique_lock KeeperStateMachine::getStorageLock() const +struct TSA_SCOPED_LOCKABLE LockGuardWithStats final { - Stopwatch watch; - std::unique_lock lock(storage_and_responses_lock); - ProfileEvents::increment(ProfileEvents::KeeperStorageLockWaitMicroseconds, watch.elapsedMicroseconds()); - return lock; + std::unique_lock lock; + explicit LockGuardWithStats(std::mutex & mutex) TSA_ACQUIRE(mutex) + { + Stopwatch watch; + std::unique_lock l(mutex); + ProfileEvents::increment(ProfileEvents::KeeperStorageLockWaitMicroseconds, watch.elapsedMicroseconds()); + lock = std::move(l); + } + + ~LockGuardWithStats() TSA_RELEASE() = default; +}; + } nuraft::ptr KeeperStateMachine::pre_commit(uint64_t log_idx, nuraft::buffer & data) @@ -282,7 +288,7 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req if (op_num == Coordination::OpNum::SessionID || op_num == Coordination::OpNum::Reconfig) return true; - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); if (storage->isFinalized()) return false; @@ -312,7 +318,7 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req void KeeperStateMachine::reconfigure(const KeeperStorage::RequestForSession& request_for_session) { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); KeeperStorage::ResponseForSession response = processReconfiguration(request_for_session); if (!responses_queue.push(response)) { @@ -412,7 +418,7 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n } using namespace std::chrono; - uint64_t elapsed = request_for_session->time - duration_cast(system_clock::now().time_since_epoch()).count(); + uint64_t elapsed = duration_cast(system_clock::now().time_since_epoch()).count() - request_for_session->time; if (elapsed > keeper_context->getCoordinationSettings()->log_slow_total_threshold_ms) { LOG_INFO( @@ -438,7 +444,7 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n response_for_session.session_id = -1; response_for_session.response = response; - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); session_id = storage->getSessionID(session_id_request.session_timeout_ms); LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms); response->session_id = session_id; @@ -453,7 +459,7 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n parsed_request_cache.erase(request_for_session->session_id); } - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid); for (auto & response_for_session : responses_for_sessions) @@ -504,7 +510,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s) } { /// deserialize and apply snapshot to storage - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); SnapshotDeserializationResult snapshot_deserialization_result; if (latest_snapshot_ptr) @@ -556,7 +562,7 @@ void KeeperStateMachine::rollbackRequest(const KeeperStorage::RequestForSession if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID) return; - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); storage->rollbackRequest(request_for_session.zxid, allow_missing); } @@ -583,7 +589,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf); CreateSnapshotTask snapshot_task; { /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking. - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); snapshot_task.snapshot = std::make_shared(storage.get(), snapshot_meta_copy, getClusterConfig()); } @@ -645,7 +651,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res } { /// Destroy snapshot with lock - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); LOG_TRACE(log, "Clearing garbage after snapshot"); /// Turn off "snapshot mode" and clear outdate part of storage state storage->clearGarbageAfterSnapshot(); @@ -786,7 +792,7 @@ int KeeperStateMachine::read_logical_snp_obj( void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session) { /// Pure local request, just process it with storage - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); auto responses = storage->processRequest( request_for_session.request, request_for_session.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/); for (const auto & response : responses) @@ -796,97 +802,97 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi void KeeperStateMachine::shutdownStorage() { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); storage->finalize(); } std::vector KeeperStateMachine::getDeadSessions() { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); return storage->getDeadSessions(); } int64_t KeeperStateMachine::getNextZxid() const { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); return storage->getNextZXID(); } KeeperStorage::Digest KeeperStateMachine::getNodesDigest() const { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); return storage->getNodesDigest(false); } uint64_t KeeperStateMachine::getLastProcessedZxid() const { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); return storage->getZXID(); } uint64_t KeeperStateMachine::getNodesCount() const { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); return storage->getNodesCount(); } uint64_t KeeperStateMachine::getTotalWatchesCount() const { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); return storage->getTotalWatchesCount(); } uint64_t KeeperStateMachine::getWatchedPathsCount() const { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); return storage->getWatchedPathsCount(); } uint64_t KeeperStateMachine::getSessionsWithWatchesCount() const { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); return storage->getSessionsWithWatchesCount(); } uint64_t KeeperStateMachine::getTotalEphemeralNodesCount() const { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); return storage->getTotalEphemeralNodesCount(); } uint64_t KeeperStateMachine::getSessionWithEphemeralNodesCount() const { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); return storage->getSessionWithEphemeralNodesCount(); } void KeeperStateMachine::dumpWatches(WriteBufferFromOwnString & buf) const { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); storage->dumpWatches(buf); } void KeeperStateMachine::dumpWatchesByPath(WriteBufferFromOwnString & buf) const { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); storage->dumpWatchesByPath(buf); } void KeeperStateMachine::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); storage->dumpSessionsAndEphemerals(buf); } uint64_t KeeperStateMachine::getApproximateDataSize() const { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); return storage->getApproximateDataSize(); } uint64_t KeeperStateMachine::getKeyArenaSize() const { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); return storage->getArenaDataSize(); } @@ -927,7 +933,7 @@ ClusterConfigPtr KeeperStateMachine::getClusterConfig() const void KeeperStateMachine::recalculateStorageStats() { - auto lock = getStorageLock(); + LockGuardWithStats lock(storage_and_responses_lock); LOG_INFO(log, "Recalculating storage stats"); storage->recalculateStats(); LOG_INFO(log, "Done recalculating storage stats"); diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index 5b166e11569..7ea14aa2d30 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -131,8 +131,6 @@ public: void reconfigure(const KeeperStorage::RequestForSession& request_for_session); private: - std::unique_lock getStorageLock() const; - CommitCallback commit_callback; /// In our state machine we always have a single snapshot which is stored /// in memory in compressed (serialized) format. @@ -141,7 +139,7 @@ private: nuraft::ptr latest_snapshot_buf = nullptr; /// Main state machine logic - KeeperStoragePtr storage; + KeeperStoragePtr storage TSA_PT_GUARDED_BY(storage_and_responses_lock); /// Save/Load and Serialize/Deserialize logic for snapshots. KeeperSnapshotManager snapshot_manager; @@ -184,7 +182,7 @@ private: KeeperSnapshotManagerS3 * snapshot_manager_s3; - KeeperStorage::ResponseForSession processReconfiguration( - const KeeperStorage::RequestForSession& request_for_session); + KeeperStorage::ResponseForSession processReconfiguration(const KeeperStorage::RequestForSession & request_for_session) + TSA_REQUIRES(storage_and_responses_lock); }; } diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 1e53a664d1b..1542eb0d71a 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -2322,7 +2322,7 @@ void KeeperStorage::preprocessRequest( elapsed_ms, zk_request->toString(/*short_format=*/true)); } - ProfileEvents::increment(ProfileEvents::KeeperPreprocessElapsedMicroseconds, watch.elapsedMicroseconds()); + ProfileEvents::increment(ProfileEvents::KeeperPreprocessElapsedMicroseconds, elapsed); }); if (!initialized) @@ -2436,7 +2436,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( elapsed_ms, zk_request->toString(/*short_format=*/true)); } - ProfileEvents::increment(ProfileEvents::KeeperProcessElapsedMicroseconds, watch.elapsedMicroseconds()); + ProfileEvents::increment(ProfileEvents::KeeperProcessElapsedMicroseconds, elapsed); }); if (!initialized) From f2c06becd5fb64d1075a7a327d990f44eeb12b2d Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 2 Jul 2024 13:17:28 +0200 Subject: [PATCH 05/10] Fix race in s3queue --- src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp index 955e49bc2bf..1939ea0a66f 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp @@ -111,10 +111,12 @@ void ObjectStorageQueueSource::FileIterator::returnForRetry(Source::ObjectInfoPt if (metadata->useBucketsForProcessing()) { const auto bucket = metadata->getBucketForPath(object_info->relative_path); + std::lock_guard lock(mutex); listed_keys_cache[bucket].keys.emplace_front(object_info); } else { + std::lock_guard lock(mutex); objects_to_retry.push_back(object_info); } } From 94dba17ad92d6982e5e681113e97eb39099ff696 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 2 Jul 2024 13:26:52 +0200 Subject: [PATCH 06/10] Fix test --- tests/integration/test_keeper_four_word_command/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_keeper_four_word_command/test.py b/tests/integration/test_keeper_four_word_command/test.py index a3a059c1dcb..83503122729 100644 --- a/tests/integration/test_keeper_four_word_command/test.py +++ b/tests/integration/test_keeper_four_word_command/test.py @@ -301,7 +301,7 @@ def test_cmd_conf(started_cluster): assert result["disk_move_retries_during_init"] == "100" assert result["log_slow_total_threshold_ms"] == "5000" - assert result["log_slow_cpu_threshold_ms"] == "5000" + assert result["log_slow_cpu_threshold_ms"] == "100" assert result["log_slow_connection_operation_threshold_ms"] == "1000" finally: close_keeper_socket(client) From 4680489c374248684feb0ea6ad78fcea4e81d7b0 Mon Sep 17 00:00:00 2001 From: Peignon Melvyn Date: Wed, 3 Jul 2024 00:31:50 +0200 Subject: [PATCH 07/10] Improve messaging around the JSON object datatype - Changed the title - Improve messaging around the future of this feature --- .../data-types/{json.md => object-data-type.md} | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) rename docs/en/sql-reference/data-types/{json.md => object-data-type.md} (79%) diff --git a/docs/en/sql-reference/data-types/json.md b/docs/en/sql-reference/data-types/object-data-type.md similarity index 79% rename from docs/en/sql-reference/data-types/json.md rename to docs/en/sql-reference/data-types/object-data-type.md index 39e37abad82..0a3f780569f 100644 --- a/docs/en/sql-reference/data-types/json.md +++ b/docs/en/sql-reference/data-types/object-data-type.md @@ -1,24 +1,19 @@ --- -slug: /en/sql-reference/data-types/json +slug: /en/sql-reference/data-types/object-data-type sidebar_position: 26 -sidebar_label: JSON +sidebar_label: Object Data Type --- -# JSON +# Object Data Type :::note -This feature is experimental and is not production-ready. If you need to work with JSON documents, consider using [this guide](/docs/en/integrations/data-ingestion/data-formats/json.md) instead. +This feature is not production-ready and is now deprecated. If you need to work with JSON documents, consider using [this guide](/docs/en/integrations/data-ingestion/data-formats/json.md) instead. A new implementation to support JSON object is in progress and can be tracked [here](https://github.com/ClickHouse/ClickHouse/issues/54864) ::: Stores JavaScript Object Notation (JSON) documents in a single column. `JSON` is an alias for `Object('json')`. -:::note -The JSON data type is an obsolete feature. Do not use it. -If you want to use it, set `allow_experimental_object_type = 1`. -::: - ## Example **Example 1** From e06d1d81603e10453ef42e38e14bb33bdd6939e5 Mon Sep 17 00:00:00 2001 From: Justin de Guzman Date: Tue, 2 Jul 2024 17:49:42 -0700 Subject: [PATCH 08/10] Formatting --- docs/en/sql-reference/data-types/object-data-type.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/en/sql-reference/data-types/object-data-type.md b/docs/en/sql-reference/data-types/object-data-type.md index 0a3f780569f..cb9c0810c84 100644 --- a/docs/en/sql-reference/data-types/object-data-type.md +++ b/docs/en/sql-reference/data-types/object-data-type.md @@ -2,6 +2,7 @@ slug: /en/sql-reference/data-types/object-data-type sidebar_position: 26 sidebar_label: Object Data Type +keywords: [object, data type] --- # Object Data Type @@ -44,7 +45,7 @@ SELECT o.a, o.b.c, o.b.d[3] FROM json **Example 2** -To be able to create an ordered `MergeTree` family table the sorting key has to be extracted into its column. For example, to insert a file of compressed HTTP access logs in JSON format: +To be able to create an ordered `MergeTree` family table, the sorting key has to be extracted into its column. For example, to insert a file of compressed HTTP access logs in JSON format: ```sql CREATE TABLE logs @@ -64,7 +65,7 @@ FROM file('access.json.gz', JSONAsString) ## Displaying JSON columns -When displaying a `JSON` column ClickHouse only shows the field values by default (because internally, it is represented as a tuple). You can display the field names as well by setting `output_format_json_named_tuples_as_objects = 1`: +When displaying a `JSON` column, ClickHouse only shows the field values by default (because internally, it is represented as a tuple). You can also display the field names by setting `output_format_json_named_tuples_as_objects = 1`: ```sql SET output_format_json_named_tuples_as_objects = 1 @@ -78,4 +79,5 @@ SELECT * FROM json FORMAT JSONEachRow ## Related Content +- [Using JSON in ClickHouse](/docs/en/integrations/data-formats/json) - [Getting Data Into ClickHouse - Part 2 - A JSON detour](https://clickhouse.com/blog/getting-data-into-clickhouse-part-2-json) From ca49cbafd96cfd0972a859c369f98265a84959f3 Mon Sep 17 00:00:00 2001 From: Justin de Guzman Date: Tue, 2 Jul 2024 17:50:06 -0700 Subject: [PATCH 09/10] Fix link --- docs/en/sql-reference/data-types/object-data-type.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/sql-reference/data-types/object-data-type.md b/docs/en/sql-reference/data-types/object-data-type.md index cb9c0810c84..c29be2cff58 100644 --- a/docs/en/sql-reference/data-types/object-data-type.md +++ b/docs/en/sql-reference/data-types/object-data-type.md @@ -8,7 +8,7 @@ keywords: [object, data type] # Object Data Type :::note -This feature is not production-ready and is now deprecated. If you need to work with JSON documents, consider using [this guide](/docs/en/integrations/data-ingestion/data-formats/json.md) instead. A new implementation to support JSON object is in progress and can be tracked [here](https://github.com/ClickHouse/ClickHouse/issues/54864) +This feature is not production-ready and is now deprecated. If you need to work with JSON documents, consider using [this guide](/docs/en/integrations/data-ingestion/data-formats/json) instead. A new implementation to support JSON object is in progress and can be tracked [here](https://github.com/ClickHouse/ClickHouse/issues/54864) ::: Stores JavaScript Object Notation (JSON) documents in a single column. From 67aca82d9e3565463588b942cf73581303dd47f2 Mon Sep 17 00:00:00 2001 From: Justin de Guzman Date: Tue, 2 Jul 2024 18:09:40 -0700 Subject: [PATCH 10/10] Revert file name change (changing slug is sufficient) --- docs/en/sql-reference/data-types/{object-data-type.md => json.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename docs/en/sql-reference/data-types/{object-data-type.md => json.md} (100%) diff --git a/docs/en/sql-reference/data-types/object-data-type.md b/docs/en/sql-reference/data-types/json.md similarity index 100% rename from docs/en/sql-reference/data-types/object-data-type.md rename to docs/en/sql-reference/data-types/json.md