Merge pull request #65918 from ClickHouse/add-some-tools-for-keeper-profiling

Add extra profiling helpers for Keeper
This commit is contained in:
Antonio Andelic 2024-07-03 07:01:13 +00:00 committed by GitHub
commit e3beff45c1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 198 additions and 74 deletions

View File

@ -611,6 +611,13 @@ The server successfully detected this situation and will download merged part fr
M(KeeperPacketsReceived, "Packets received by keeper server") \ M(KeeperPacketsReceived, "Packets received by keeper server") \
M(KeeperRequestTotal, "Total requests number on keeper server") \ M(KeeperRequestTotal, "Total requests number on keeper server") \
M(KeeperLatency, "Keeper latency") \ M(KeeperLatency, "Keeper latency") \
M(KeeperTotalElapsedMicroseconds, "Keeper total latency for a single request") \
M(KeeperProcessElapsedMicroseconds, "Keeper commit latency for a single request") \
M(KeeperPreprocessElapsedMicroseconds, "Keeper preprocessing latency for a single reuquest") \
M(KeeperStorageLockWaitMicroseconds, "Time spent waiting for acquiring Keeper storage lock") \
M(KeeperCommitWaitElapsedMicroseconds, "Time spent waiting for certain log to be committed") \
M(KeeperBatchMaxCount, "Number of times the size of batch was limited by the amount") \
M(KeeperBatchMaxTotalSize, "Number of times the size of batch was limited by the total bytes size") \
M(KeeperCommits, "Number of successful commits") \ M(KeeperCommits, "Number of successful commits") \
M(KeeperCommitsFailed, "Number of failed commits") \ M(KeeperCommitsFailed, "Number of failed commits") \
M(KeeperSnapshotCreations, "Number of snapshots creations")\ M(KeeperSnapshotCreations, "Number of snapshots creations")\

View File

@ -9,7 +9,6 @@
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <fmt/format.h> #include <fmt/format.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <array>
namespace Coordination namespace Coordination
@ -29,7 +28,7 @@ void ZooKeeperResponse::write(WriteBuffer & out) const
Coordination::write(buf.str(), out); Coordination::write(buf.str(), out);
} }
std::string ZooKeeperRequest::toString() const std::string ZooKeeperRequest::toString(bool short_format) const
{ {
return fmt::format( return fmt::format(
"XID = {}\n" "XID = {}\n"
@ -37,7 +36,7 @@ std::string ZooKeeperRequest::toString() const
"Additional info:\n{}", "Additional info:\n{}",
xid, xid,
getOpNum(), getOpNum(),
toStringImpl()); toStringImpl(short_format));
} }
void ZooKeeperRequest::write(WriteBuffer & out) const void ZooKeeperRequest::write(WriteBuffer & out) const
@ -60,7 +59,7 @@ void ZooKeeperSyncRequest::readImpl(ReadBuffer & in)
Coordination::read(path, in); Coordination::read(path, in);
} }
std::string ZooKeeperSyncRequest::toStringImpl() const std::string ZooKeeperSyncRequest::toStringImpl(bool /*short_format*/) const
{ {
return fmt::format("path = {}", path); return fmt::format("path = {}", path);
} }
@ -91,7 +90,7 @@ void ZooKeeperReconfigRequest::readImpl(ReadBuffer & in)
Coordination::read(version, in); Coordination::read(version, in);
} }
std::string ZooKeeperReconfigRequest::toStringImpl() const std::string ZooKeeperReconfigRequest::toStringImpl(bool /*short_format*/) const
{ {
return fmt::format( return fmt::format(
"joining = {}\nleaving = {}\nnew_members = {}\nversion = {}", "joining = {}\nleaving = {}\nnew_members = {}\nversion = {}",
@ -145,7 +144,7 @@ void ZooKeeperAuthRequest::readImpl(ReadBuffer & in)
Coordination::read(data, in); Coordination::read(data, in);
} }
std::string ZooKeeperAuthRequest::toStringImpl() const std::string ZooKeeperAuthRequest::toStringImpl(bool /*short_format*/) const
{ {
return fmt::format( return fmt::format(
"type = {}\n" "type = {}\n"
@ -191,7 +190,7 @@ void ZooKeeperCreateRequest::readImpl(ReadBuffer & in)
is_sequential = true; is_sequential = true;
} }
std::string ZooKeeperCreateRequest::toStringImpl() const std::string ZooKeeperCreateRequest::toStringImpl(bool /*short_format*/) const
{ {
return fmt::format( return fmt::format(
"path = {}\n" "path = {}\n"
@ -218,7 +217,7 @@ void ZooKeeperRemoveRequest::writeImpl(WriteBuffer & out) const
Coordination::write(version, out); Coordination::write(version, out);
} }
std::string ZooKeeperRemoveRequest::toStringImpl() const std::string ZooKeeperRemoveRequest::toStringImpl(bool /*short_format*/) const
{ {
return fmt::format( return fmt::format(
"path = {}\n" "path = {}\n"
@ -245,7 +244,7 @@ void ZooKeeperExistsRequest::readImpl(ReadBuffer & in)
Coordination::read(has_watch, in); Coordination::read(has_watch, in);
} }
std::string ZooKeeperExistsRequest::toStringImpl() const std::string ZooKeeperExistsRequest::toStringImpl(bool /*short_format*/) const
{ {
return fmt::format("path = {}", path); return fmt::format("path = {}", path);
} }
@ -272,7 +271,7 @@ void ZooKeeperGetRequest::readImpl(ReadBuffer & in)
Coordination::read(has_watch, in); Coordination::read(has_watch, in);
} }
std::string ZooKeeperGetRequest::toStringImpl() const std::string ZooKeeperGetRequest::toStringImpl(bool /*short_format*/) const
{ {
return fmt::format("path = {}", path); return fmt::format("path = {}", path);
} }
@ -303,7 +302,7 @@ void ZooKeeperSetRequest::readImpl(ReadBuffer & in)
Coordination::read(version, in); Coordination::read(version, in);
} }
std::string ZooKeeperSetRequest::toStringImpl() const std::string ZooKeeperSetRequest::toStringImpl(bool /*short_format*/) const
{ {
return fmt::format( return fmt::format(
"path = {}\n" "path = {}\n"
@ -334,7 +333,7 @@ void ZooKeeperListRequest::readImpl(ReadBuffer & in)
Coordination::read(has_watch, in); Coordination::read(has_watch, in);
} }
std::string ZooKeeperListRequest::toStringImpl() const std::string ZooKeeperListRequest::toStringImpl(bool /*short_format*/) const
{ {
return fmt::format("path = {}", path); return fmt::format("path = {}", path);
} }
@ -356,7 +355,7 @@ void ZooKeeperFilteredListRequest::readImpl(ReadBuffer & in)
list_request_type = static_cast<ListRequestType>(read_request_type); list_request_type = static_cast<ListRequestType>(read_request_type);
} }
std::string ZooKeeperFilteredListRequest::toStringImpl() const std::string ZooKeeperFilteredListRequest::toStringImpl(bool /*short_format*/) const
{ {
return fmt::format( return fmt::format(
"path = {}\n" "path = {}\n"
@ -401,7 +400,7 @@ void ZooKeeperSetACLRequest::readImpl(ReadBuffer & in)
Coordination::read(version, in); Coordination::read(version, in);
} }
std::string ZooKeeperSetACLRequest::toStringImpl() const std::string ZooKeeperSetACLRequest::toStringImpl(bool /*short_format*/) const
{ {
return fmt::format("path = {}\nversion = {}", path, version); return fmt::format("path = {}\nversion = {}", path, version);
} }
@ -426,7 +425,7 @@ void ZooKeeperGetACLRequest::writeImpl(WriteBuffer & out) const
Coordination::write(path, out); Coordination::write(path, out);
} }
std::string ZooKeeperGetACLRequest::toStringImpl() const std::string ZooKeeperGetACLRequest::toStringImpl(bool /*short_format*/) const
{ {
return fmt::format("path = {}", path); return fmt::format("path = {}", path);
} }
@ -455,7 +454,7 @@ void ZooKeeperCheckRequest::readImpl(ReadBuffer & in)
Coordination::read(version, in); Coordination::read(version, in);
} }
std::string ZooKeeperCheckRequest::toStringImpl() const std::string ZooKeeperCheckRequest::toStringImpl(bool /*short_format*/) const
{ {
return fmt::format("path = {}\nversion = {}", path, version); return fmt::format("path = {}\nversion = {}", path, version);
} }
@ -600,8 +599,11 @@ void ZooKeeperMultiRequest::readImpl(ReadBuffer & in)
} }
} }
std::string ZooKeeperMultiRequest::toStringImpl() const std::string ZooKeeperMultiRequest::toStringImpl(bool short_format) const
{ {
if (short_format)
return fmt::format("Subrequests size = {}", requests.size());
auto out = fmt::memory_buffer(); auto out = fmt::memory_buffer();
for (const auto & request : requests) for (const auto & request : requests)
{ {

View File

@ -63,12 +63,12 @@ struct ZooKeeperRequest : virtual Request
/// Writes length, xid, op_num, then the rest. /// Writes length, xid, op_num, then the rest.
void write(WriteBuffer & out) const; void write(WriteBuffer & out) const;
std::string toString() const; std::string toString(bool short_format = false) const;
virtual void writeImpl(WriteBuffer &) const = 0; virtual void writeImpl(WriteBuffer &) const = 0;
virtual void readImpl(ReadBuffer &) = 0; virtual void readImpl(ReadBuffer &) = 0;
virtual std::string toStringImpl() const { return ""; } virtual std::string toStringImpl(bool /*short_format*/) const { return ""; }
static std::shared_ptr<ZooKeeperRequest> read(ReadBuffer & in); static std::shared_ptr<ZooKeeperRequest> read(ReadBuffer & in);
@ -98,7 +98,7 @@ struct ZooKeeperSyncRequest final : ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Sync; } OpNum getOpNum() const override { return OpNum::Sync; }
void writeImpl(WriteBuffer & out) const override; void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override; void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override; std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override; ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; } bool isReadRequest() const override { return false; }
@ -123,7 +123,7 @@ struct ZooKeeperReconfigRequest final : ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Reconfig; } OpNum getOpNum() const override { return OpNum::Reconfig; }
void writeImpl(WriteBuffer & out) const override; void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override; void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override; std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override; ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; } bool isReadRequest() const override { return false; }
@ -176,7 +176,7 @@ struct ZooKeeperAuthRequest final : ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Auth; } OpNum getOpNum() const override { return OpNum::Auth; }
void writeImpl(WriteBuffer & out) const override; void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override; void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override; std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override; ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; } bool isReadRequest() const override { return false; }
@ -229,7 +229,7 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest
OpNum getOpNum() const override { return not_exists ? OpNum::CreateIfNotExists : OpNum::Create; } OpNum getOpNum() const override { return not_exists ? OpNum::CreateIfNotExists : OpNum::Create; }
void writeImpl(WriteBuffer & out) const override; void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override; void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override; std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override; ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; } bool isReadRequest() const override { return false; }
@ -266,7 +266,7 @@ struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Remove; } OpNum getOpNum() const override { return OpNum::Remove; }
void writeImpl(WriteBuffer & out) const override; void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override; void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override; std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override; ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; } bool isReadRequest() const override { return false; }
@ -293,7 +293,7 @@ struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Exists; } OpNum getOpNum() const override { return OpNum::Exists; }
void writeImpl(WriteBuffer & out) const override; void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override; void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override; std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override; ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; } bool isReadRequest() const override { return true; }
@ -320,7 +320,7 @@ struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Get; } OpNum getOpNum() const override { return OpNum::Get; }
void writeImpl(WriteBuffer & out) const override; void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override; void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override; std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override; ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; } bool isReadRequest() const override { return true; }
@ -347,7 +347,7 @@ struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::Set; } OpNum getOpNum() const override { return OpNum::Set; }
void writeImpl(WriteBuffer & out) const override; void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override; void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override; std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override; ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; } bool isReadRequest() const override { return false; }
@ -375,7 +375,7 @@ struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::List; } OpNum getOpNum() const override { return OpNum::List; }
void writeImpl(WriteBuffer & out) const override; void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override; void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override; std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override; ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; } bool isReadRequest() const override { return true; }
@ -395,7 +395,7 @@ struct ZooKeeperFilteredListRequest final : ZooKeeperListRequest
OpNum getOpNum() const override { return OpNum::FilteredList; } OpNum getOpNum() const override { return OpNum::FilteredList; }
void writeImpl(WriteBuffer & out) const override; void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override; void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override; std::string toStringImpl(bool short_format) const override;
size_t bytesSize() const override { return ZooKeeperListRequest::bytesSize() + sizeof(list_request_type); } size_t bytesSize() const override { return ZooKeeperListRequest::bytesSize() + sizeof(list_request_type); }
}; };
@ -428,7 +428,7 @@ struct ZooKeeperCheckRequest : CheckRequest, ZooKeeperRequest
OpNum getOpNum() const override { return not_exists ? OpNum::CheckNotExists : OpNum::Check; } OpNum getOpNum() const override { return not_exists ? OpNum::CheckNotExists : OpNum::Check; }
void writeImpl(WriteBuffer & out) const override; void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override; void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override; std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override; ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; } bool isReadRequest() const override { return true; }
@ -469,7 +469,7 @@ struct ZooKeeperSetACLRequest final : SetACLRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::SetACL; } OpNum getOpNum() const override { return OpNum::SetACL; }
void writeImpl(WriteBuffer & out) const override; void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override; void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override; std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override; ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; } bool isReadRequest() const override { return false; }
@ -490,7 +490,7 @@ struct ZooKeeperGetACLRequest final : GetACLRequest, ZooKeeperRequest
OpNum getOpNum() const override { return OpNum::GetACL; } OpNum getOpNum() const override { return OpNum::GetACL; }
void writeImpl(WriteBuffer & out) const override; void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override; void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override; std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override; ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return true; } bool isReadRequest() const override { return true; }
@ -516,7 +516,7 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
void writeImpl(WriteBuffer & out) const override; void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override; void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override; std::string toStringImpl(bool short_format) const override;
ZooKeeperResponsePtr makeResponse() const override; ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override; bool isReadRequest() const override;

View File

@ -169,6 +169,23 @@ void KeeperConfigurationAndSettings::dump(WriteBufferFromOwnString & buf) const
writeText("async_replication=", buf); writeText("async_replication=", buf);
write_bool(coordination_settings->async_replication); write_bool(coordination_settings->async_replication);
writeText("latest_logs_cache_size_threshold=", buf);
write_int(coordination_settings->latest_logs_cache_size_threshold);
writeText("commit_logs_cache_size_threshold=", buf);
write_int(coordination_settings->commit_logs_cache_size_threshold);
writeText("disk_move_retries_wait_ms=", buf);
write_int(coordination_settings->disk_move_retries_wait_ms);
writeText("disk_move_retries_during_init=", buf);
write_int(coordination_settings->disk_move_retries_during_init);
writeText("log_slow_total_threshold_ms=", buf);
write_int(coordination_settings->log_slow_total_threshold_ms);
writeText("log_slow_cpu_threshold_ms=", buf);
write_int(coordination_settings->log_slow_cpu_threshold_ms);
writeText("log_slow_connection_operation_threshold_ms=", buf);
write_int(coordination_settings->log_slow_connection_operation_threshold_ms);
} }
KeeperConfigurationAndSettingsPtr KeeperConfigurationAndSettingsPtr

View File

@ -58,7 +58,10 @@ struct Settings;
M(UInt64, latest_logs_cache_size_threshold, 1 * 1024 * 1024 * 1024, "Maximum total size of in-memory cache of latest log entries.", 0) \ M(UInt64, latest_logs_cache_size_threshold, 1 * 1024 * 1024 * 1024, "Maximum total size of in-memory cache of latest log entries.", 0) \
M(UInt64, commit_logs_cache_size_threshold, 500 * 1024 * 1024, "Maximum total size of in-memory cache of log entries needed next for commit.", 0) \ M(UInt64, commit_logs_cache_size_threshold, 500 * 1024 * 1024, "Maximum total size of in-memory cache of log entries needed next for commit.", 0) \
M(UInt64, disk_move_retries_wait_ms, 1000, "How long to wait between retries after a failure which happened while a file was being moved between disks.", 0) \ M(UInt64, disk_move_retries_wait_ms, 1000, "How long to wait between retries after a failure which happened while a file was being moved between disks.", 0) \
M(UInt64, disk_move_retries_during_init, 100, "The amount of retries after a failure which happened while a file was being moved between disks during initialization.", 0) M(UInt64, disk_move_retries_during_init, 100, "The amount of retries after a failure which happened while a file was being moved between disks during initialization.", 0) \
M(UInt64, log_slow_total_threshold_ms, 5000, "Requests for which the total latency is larger than this settings will be logged", 0) \
M(UInt64, log_slow_cpu_threshold_ms, 100, "Requests for which the CPU (preprocessing and processing) latency is larger than this settings will be logged", 0) \
M(UInt64, log_slow_connection_operation_threshold_ms, 1000, "Log message if a certain operation took too long inside a single connection", 0)
DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS) DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS)

View File

@ -238,6 +238,13 @@
M(KeeperPacketsReceived) \ M(KeeperPacketsReceived) \
M(KeeperRequestTotal) \ M(KeeperRequestTotal) \
M(KeeperLatency) \ M(KeeperLatency) \
M(KeeperTotalElapsedMicroseconds) \
M(KeeperProcessElapsedMicroseconds) \
M(KeeperPreprocessElapsedMicroseconds) \
M(KeeperStorageLockWaitMicroseconds) \
M(KeeperCommitWaitElapsedMicroseconds) \
M(KeeperBatchMaxCount) \
M(KeeperBatchMaxTotalSize) \
M(KeeperCommits) \ M(KeeperCommits) \
M(KeeperCommitsFailed) \ M(KeeperCommitsFailed) \
M(KeeperSnapshotCreations) \ M(KeeperSnapshotCreations) \

View File

@ -31,6 +31,13 @@ namespace CurrentMetrics
extern const Metric KeeperOutstandingRequets; extern const Metric KeeperOutstandingRequets;
} }
namespace ProfileEvents
{
extern const Event KeeperCommitWaitElapsedMicroseconds;
extern const Event KeeperBatchMaxCount;
extern const Event KeeperBatchMaxTotalSize;
}
using namespace std::chrono_literals; using namespace std::chrono_literals;
namespace DB namespace DB
@ -119,6 +126,7 @@ void KeeperDispatcher::requestThread()
auto coordination_settings = configuration_and_settings->coordination_settings; auto coordination_settings = configuration_and_settings->coordination_settings;
uint64_t max_wait = coordination_settings->operation_timeout_ms.totalMilliseconds(); uint64_t max_wait = coordination_settings->operation_timeout_ms.totalMilliseconds();
uint64_t max_batch_bytes_size = coordination_settings->max_requests_batch_bytes_size; uint64_t max_batch_bytes_size = coordination_settings->max_requests_batch_bytes_size;
size_t max_batch_size = coordination_settings->max_requests_batch_size;
/// The code below do a very simple thing: batch all write (quorum) requests into vector until /// The code below do a very simple thing: batch all write (quorum) requests into vector until
/// previous write batch is not finished or max_batch size achieved. The main complexity goes from /// previous write batch is not finished or max_batch size achieved. The main complexity goes from
@ -188,7 +196,6 @@ void KeeperDispatcher::requestThread()
return false; return false;
}; };
size_t max_batch_size = coordination_settings->max_requests_batch_size;
while (!shutdown_called && current_batch.size() < max_batch_size && !has_reconfig_request while (!shutdown_called && current_batch.size() < max_batch_size && !has_reconfig_request
&& current_batch_bytes_size < max_batch_bytes_size && try_get_request()) && current_batch_bytes_size < max_batch_bytes_size && try_get_request())
; ;
@ -225,6 +232,12 @@ void KeeperDispatcher::requestThread()
/// Process collected write requests batch /// Process collected write requests batch
if (!current_batch.empty()) if (!current_batch.empty())
{ {
if (current_batch.size() == max_batch_size)
ProfileEvents::increment(ProfileEvents::KeeperBatchMaxCount, 1);
if (current_batch_bytes_size == max_batch_bytes_size)
ProfileEvents::increment(ProfileEvents::KeeperBatchMaxTotalSize, 1);
LOG_TRACE(log, "Processing requests batch, size: {}, bytes: {}", current_batch.size(), current_batch_bytes_size); LOG_TRACE(log, "Processing requests batch, size: {}, bytes: {}", current_batch.size(), current_batch_bytes_size);
auto result = server->putRequestBatch(current_batch); auto result = server->putRequestBatch(current_batch);
@ -243,6 +256,8 @@ void KeeperDispatcher::requestThread()
/// If we will execute read or reconfig next, we have to process result now /// If we will execute read or reconfig next, we have to process result now
if (execute_requests_after_write) if (execute_requests_after_write)
{ {
Stopwatch watch;
SCOPE_EXIT(ProfileEvents::increment(ProfileEvents::KeeperCommitWaitElapsedMicroseconds, watch.elapsedMicroseconds()));
if (prev_result) if (prev_result)
result_buf = forceWaitAndProcessResult( result_buf = forceWaitAndProcessResult(
prev_result, prev_batch, /*clear_requests_on_success=*/!execute_requests_after_write); prev_result, prev_batch, /*clear_requests_on_success=*/!execute_requests_after_write);

View File

@ -1,12 +1,14 @@
#include <atomic> #include <atomic>
#include <cerrno> #include <cerrno>
#include <chrono>
#include <Coordination/KeeperDispatcher.h>
#include <Coordination/KeeperReconfiguration.h>
#include <Coordination/KeeperSnapshotManager.h> #include <Coordination/KeeperSnapshotManager.h>
#include <Coordination/KeeperStateMachine.h> #include <Coordination/KeeperStateMachine.h>
#include <Coordination/KeeperDispatcher.h>
#include <Coordination/KeeperStorage.h> #include <Coordination/KeeperStorage.h>
#include <Coordination/KeeperReconfiguration.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h> #include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h> #include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <Disks/DiskLocal.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <base/defines.h> #include <base/defines.h>
#include <base/errnoToString.h> #include <base/errnoToString.h>
@ -17,7 +19,6 @@
#include <Common/ZooKeeper/ZooKeeperCommon.h> #include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h> #include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Disks/DiskLocal.h>
namespace ProfileEvents namespace ProfileEvents
@ -31,6 +32,7 @@ namespace ProfileEvents
extern const Event KeeperSnapshotApplysFailed; extern const Event KeeperSnapshotApplysFailed;
extern const Event KeeperReadSnapshot; extern const Event KeeperReadSnapshot;
extern const Event KeeperSaveSnapshot; extern const Event KeeperSaveSnapshot;
extern const Event KeeperStorageLockWaitMicroseconds;
} }
namespace DB namespace DB
@ -151,6 +153,20 @@ void assertDigest(
} }
} }
struct TSA_SCOPED_LOCKABLE LockGuardWithStats final
{
std::unique_lock<std::mutex> lock;
explicit LockGuardWithStats(std::mutex & mutex) TSA_ACQUIRE(mutex)
{
Stopwatch watch;
std::unique_lock l(mutex);
ProfileEvents::increment(ProfileEvents::KeeperStorageLockWaitMicroseconds, watch.elapsedMicroseconds());
lock = std::move(l);
}
~LockGuardWithStats() TSA_RELEASE() = default;
};
} }
nuraft::ptr<nuraft::buffer> KeeperStateMachine::pre_commit(uint64_t log_idx, nuraft::buffer & data) nuraft::ptr<nuraft::buffer> KeeperStateMachine::pre_commit(uint64_t log_idx, nuraft::buffer & data)
@ -272,7 +288,7 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
if (op_num == Coordination::OpNum::SessionID || op_num == Coordination::OpNum::Reconfig) if (op_num == Coordination::OpNum::SessionID || op_num == Coordination::OpNum::Reconfig)
return true; return true;
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
if (storage->isFinalized()) if (storage->isFinalized())
return false; return false;
@ -302,7 +318,7 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
void KeeperStateMachine::reconfigure(const KeeperStorage::RequestForSession& request_for_session) void KeeperStateMachine::reconfigure(const KeeperStorage::RequestForSession& request_for_session)
{ {
std::lock_guard _(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
KeeperStorage::ResponseForSession response = processReconfiguration(request_for_session); KeeperStorage::ResponseForSession response = processReconfiguration(request_for_session);
if (!responses_queue.push(response)) if (!responses_queue.push(response))
{ {
@ -391,7 +407,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
if (!keeper_context->localLogsPreprocessed() && !preprocess(*request_for_session)) if (!keeper_context->localLogsPreprocessed() && !preprocess(*request_for_session))
return nullptr; return nullptr;
auto try_push = [this](const KeeperStorage::ResponseForSession& response) auto try_push = [&](const KeeperStorage::ResponseForSession& response)
{ {
if (!responses_queue.push(response)) if (!responses_queue.push(response))
{ {
@ -400,6 +416,17 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
"Failed to push response with session id {} to the queue, probably because of shutdown", "Failed to push response with session id {} to the queue, probably because of shutdown",
response.session_id); response.session_id);
} }
using namespace std::chrono;
uint64_t elapsed = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() - request_for_session->time;
if (elapsed > keeper_context->getCoordinationSettings()->log_slow_total_threshold_ms)
{
LOG_INFO(
log,
"Total time to process a request took too long ({}ms).\nRequest info: {}",
elapsed,
request_for_session->request->toString(/*short_format=*/true));
}
}; };
try try
@ -417,7 +444,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
response_for_session.session_id = -1; response_for_session.session_id = -1;
response_for_session.response = response; response_for_session.response = response;
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
session_id = storage->getSessionID(session_id_request.session_timeout_ms); session_id = storage->getSessionID(session_id_request.session_timeout_ms);
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms); LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
response->session_id = session_id; response->session_id = session_id;
@ -426,12 +453,13 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
else else
{ {
if (op_num == Coordination::OpNum::Close) if (op_num == Coordination::OpNum::Close)
{ {
std::lock_guard lock(request_cache_mutex); std::lock_guard lock(request_cache_mutex);
parsed_request_cache.erase(request_for_session->session_id); parsed_request_cache.erase(request_for_session->session_id);
} }
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
KeeperStorage::ResponsesForSessions responses_for_sessions KeeperStorage::ResponsesForSessions responses_for_sessions
= storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid); = storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid);
for (auto & response_for_session : responses_for_sessions) for (auto & response_for_session : responses_for_sessions)
@ -482,7 +510,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
} }
{ /// deserialize and apply snapshot to storage { /// deserialize and apply snapshot to storage
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
SnapshotDeserializationResult snapshot_deserialization_result; SnapshotDeserializationResult snapshot_deserialization_result;
if (latest_snapshot_ptr) if (latest_snapshot_ptr)
@ -534,7 +562,7 @@ void KeeperStateMachine::rollbackRequest(const KeeperStorage::RequestForSession
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID) if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return; return;
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
storage->rollbackRequest(request_for_session.zxid, allow_missing); storage->rollbackRequest(request_for_session.zxid, allow_missing);
} }
@ -561,7 +589,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf); auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf);
CreateSnapshotTask snapshot_task; CreateSnapshotTask snapshot_task;
{ /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking. { /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking.
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy, getClusterConfig()); snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy, getClusterConfig());
} }
@ -623,7 +651,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
} }
{ {
/// Destroy snapshot with lock /// Destroy snapshot with lock
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
LOG_TRACE(log, "Clearing garbage after snapshot"); LOG_TRACE(log, "Clearing garbage after snapshot");
/// Turn off "snapshot mode" and clear outdate part of storage state /// Turn off "snapshot mode" and clear outdate part of storage state
storage->clearGarbageAfterSnapshot(); storage->clearGarbageAfterSnapshot();
@ -764,7 +792,7 @@ int KeeperStateMachine::read_logical_snp_obj(
void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session) void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session)
{ {
/// Pure local request, just process it with storage /// Pure local request, just process it with storage
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
auto responses = storage->processRequest( auto responses = storage->processRequest(
request_for_session.request, request_for_session.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/); request_for_session.request, request_for_session.session_id, std::nullopt, true /*check_acl*/, true /*is_local*/);
for (const auto & response : responses) for (const auto & response : responses)
@ -774,97 +802,97 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi
void KeeperStateMachine::shutdownStorage() void KeeperStateMachine::shutdownStorage()
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
storage->finalize(); storage->finalize();
} }
std::vector<int64_t> KeeperStateMachine::getDeadSessions() std::vector<int64_t> KeeperStateMachine::getDeadSessions()
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
return storage->getDeadSessions(); return storage->getDeadSessions();
} }
int64_t KeeperStateMachine::getNextZxid() const int64_t KeeperStateMachine::getNextZxid() const
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
return storage->getNextZXID(); return storage->getNextZXID();
} }
KeeperStorage::Digest KeeperStateMachine::getNodesDigest() const KeeperStorage::Digest KeeperStateMachine::getNodesDigest() const
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
return storage->getNodesDigest(false); return storage->getNodesDigest(false);
} }
uint64_t KeeperStateMachine::getLastProcessedZxid() const uint64_t KeeperStateMachine::getLastProcessedZxid() const
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
return storage->getZXID(); return storage->getZXID();
} }
uint64_t KeeperStateMachine::getNodesCount() const uint64_t KeeperStateMachine::getNodesCount() const
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
return storage->getNodesCount(); return storage->getNodesCount();
} }
uint64_t KeeperStateMachine::getTotalWatchesCount() const uint64_t KeeperStateMachine::getTotalWatchesCount() const
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
return storage->getTotalWatchesCount(); return storage->getTotalWatchesCount();
} }
uint64_t KeeperStateMachine::getWatchedPathsCount() const uint64_t KeeperStateMachine::getWatchedPathsCount() const
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
return storage->getWatchedPathsCount(); return storage->getWatchedPathsCount();
} }
uint64_t KeeperStateMachine::getSessionsWithWatchesCount() const uint64_t KeeperStateMachine::getSessionsWithWatchesCount() const
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
return storage->getSessionsWithWatchesCount(); return storage->getSessionsWithWatchesCount();
} }
uint64_t KeeperStateMachine::getTotalEphemeralNodesCount() const uint64_t KeeperStateMachine::getTotalEphemeralNodesCount() const
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
return storage->getTotalEphemeralNodesCount(); return storage->getTotalEphemeralNodesCount();
} }
uint64_t KeeperStateMachine::getSessionWithEphemeralNodesCount() const uint64_t KeeperStateMachine::getSessionWithEphemeralNodesCount() const
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
return storage->getSessionWithEphemeralNodesCount(); return storage->getSessionWithEphemeralNodesCount();
} }
void KeeperStateMachine::dumpWatches(WriteBufferFromOwnString & buf) const void KeeperStateMachine::dumpWatches(WriteBufferFromOwnString & buf) const
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
storage->dumpWatches(buf); storage->dumpWatches(buf);
} }
void KeeperStateMachine::dumpWatchesByPath(WriteBufferFromOwnString & buf) const void KeeperStateMachine::dumpWatchesByPath(WriteBufferFromOwnString & buf) const
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
storage->dumpWatchesByPath(buf); storage->dumpWatchesByPath(buf);
} }
void KeeperStateMachine::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const void KeeperStateMachine::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
storage->dumpSessionsAndEphemerals(buf); storage->dumpSessionsAndEphemerals(buf);
} }
uint64_t KeeperStateMachine::getApproximateDataSize() const uint64_t KeeperStateMachine::getApproximateDataSize() const
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
return storage->getApproximateDataSize(); return storage->getApproximateDataSize();
} }
uint64_t KeeperStateMachine::getKeyArenaSize() const uint64_t KeeperStateMachine::getKeyArenaSize() const
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
return storage->getArenaDataSize(); return storage->getArenaDataSize();
} }
@ -905,7 +933,7 @@ ClusterConfigPtr KeeperStateMachine::getClusterConfig() const
void KeeperStateMachine::recalculateStorageStats() void KeeperStateMachine::recalculateStorageStats()
{ {
std::lock_guard lock(storage_and_responses_lock); LockGuardWithStats lock(storage_and_responses_lock);
LOG_INFO(log, "Recalculating storage stats"); LOG_INFO(log, "Recalculating storage stats");
storage->recalculateStats(); storage->recalculateStats();
LOG_INFO(log, "Done recalculating storage stats"); LOG_INFO(log, "Done recalculating storage stats");

View File

@ -182,8 +182,7 @@ private:
KeeperSnapshotManagerS3 * snapshot_manager_s3; KeeperSnapshotManagerS3 * snapshot_manager_s3;
KeeperStorage::ResponseForSession processReconfiguration( KeeperStorage::ResponseForSession processReconfiguration(const KeeperStorage::RequestForSession & request_for_session)
const KeeperStorage::RequestForSession& request_for_session)
TSA_REQUIRES(storage_and_responses_lock); TSA_REQUIRES(storage_and_responses_lock);
}; };
} }

View File

@ -40,6 +40,8 @@ namespace ProfileEvents
extern const Event KeeperGetRequest; extern const Event KeeperGetRequest;
extern const Event KeeperListRequest; extern const Event KeeperListRequest;
extern const Event KeeperExistsRequest; extern const Event KeeperExistsRequest;
extern const Event KeeperPreprocessElapsedMicroseconds;
extern const Event KeeperProcessElapsedMicroseconds;
} }
namespace DB namespace DB
@ -2309,6 +2311,20 @@ void KeeperStorage::preprocessRequest(
std::optional<Digest> digest, std::optional<Digest> digest,
int64_t log_idx) int64_t log_idx)
{ {
Stopwatch watch;
SCOPE_EXIT({
auto elapsed = watch.elapsedMicroseconds();
if (auto elapsed_ms = elapsed / 1000; elapsed_ms > keeper_context->getCoordinationSettings()->log_slow_cpu_threshold_ms)
{
LOG_INFO(
getLogger("KeeperStorage"),
"Preprocessing a request took too long ({}ms).\nRequest info: {}",
elapsed_ms,
zk_request->toString(/*short_format=*/true));
}
ProfileEvents::increment(ProfileEvents::KeeperPreprocessElapsedMicroseconds, elapsed);
});
if (!initialized) if (!initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "KeeperStorage system nodes are not initialized"); throw Exception(ErrorCodes::LOGICAL_ERROR, "KeeperStorage system nodes are not initialized");
@ -2409,6 +2425,20 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
bool check_acl, bool check_acl,
bool is_local) bool is_local)
{ {
Stopwatch watch;
SCOPE_EXIT({
auto elapsed = watch.elapsedMicroseconds();
if (auto elapsed_ms = elapsed / 1000; elapsed_ms > keeper_context->getCoordinationSettings()->log_slow_cpu_threshold_ms)
{
LOG_INFO(
getLogger("KeeperStorage"),
"Processing a request took too long ({}ms).\nRequest info: {}",
elapsed_ms,
zk_request->toString(/*short_format=*/true));
}
ProfileEvents::increment(ProfileEvents::KeeperProcessElapsedMicroseconds, elapsed);
});
if (!initialized) if (!initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "KeeperStorage system nodes are not initialized"); throw Exception(ErrorCodes::LOGICAL_ERROR, "KeeperStorage system nodes are not initialized");

View File

@ -3,6 +3,7 @@
#include <Common/HashTable/HashMap.h> #include <Common/HashTable/HashMap.h>
#include <Common/ArenaUtils.h> #include <Common/ArenaUtils.h>
#include <list>
namespace DB namespace DB
{ {

View File

@ -13,11 +13,9 @@
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <base/defines.h> #include <base/defines.h>
#include <chrono>
#include <Common/PipeFDs.h> #include <Common/PipeFDs.h>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <IO/ReadBufferFromFileDescriptor.h> #include <IO/ReadBufferFromFileDescriptor.h>
#include <queue>
#include <mutex> #include <mutex>
#include <Coordination/FourLetterCommand.h> #include <Coordination/FourLetterCommand.h>
#include <IO/CompressionMethod.h> #include <IO/CompressionMethod.h>
@ -30,6 +28,11 @@
#include <poll.h> #include <poll.h>
#endif #endif
namespace ProfileEvents
{
extern const Event KeeperTotalElapsedMicroseconds;
}
namespace DB namespace DB
{ {
@ -411,12 +414,12 @@ void KeeperTCPHandler::runImpl()
keeper_dispatcher->registerSession(session_id, response_callback); keeper_dispatcher->registerSession(session_id, response_callback);
Stopwatch logging_stopwatch; Stopwatch logging_stopwatch;
auto operation_max_ms = keeper_dispatcher->getKeeperContext()->getCoordinationSettings()->log_slow_connection_operation_threshold_ms;
auto log_long_operation = [&](const String & operation) auto log_long_operation = [&](const String & operation)
{ {
constexpr UInt64 operation_max_ms = 500;
auto elapsed_ms = logging_stopwatch.elapsedMilliseconds(); auto elapsed_ms = logging_stopwatch.elapsedMilliseconds();
if (operation_max_ms < elapsed_ms) if (operation_max_ms < elapsed_ms)
LOG_TEST(log, "{} for session {} took {} ms", operation, session_id, elapsed_ms); LOG_INFO(log, "{} for session {} took {} ms", operation, session_id, elapsed_ms);
logging_stopwatch.restart(); logging_stopwatch.restart();
}; };
@ -611,11 +614,13 @@ void KeeperTCPHandler::updateStats(Coordination::ZooKeeperResponsePtr & response
/// update statistics ignoring watch response and heartbeat. /// update statistics ignoring watch response and heartbeat.
if (response->xid != Coordination::WATCH_XID && response->getOpNum() != Coordination::OpNum::Heartbeat) if (response->xid != Coordination::WATCH_XID && response->getOpNum() != Coordination::OpNum::Heartbeat)
{ {
Int64 elapsed = (Poco::Timestamp() - operations[response->xid]) / 1000; Int64 elapsed = (Poco::Timestamp() - operations[response->xid]);
conn_stats.updateLatency(elapsed); ProfileEvents::increment(ProfileEvents::KeeperTotalElapsedMicroseconds, elapsed);
Int64 elapsed_ms = elapsed / 1000;
conn_stats.updateLatency(elapsed_ms);
operations.erase(response->xid); operations.erase(response->xid);
keeper_dispatcher->updateKeeperStatLatency(elapsed); keeper_dispatcher->updateKeeperStatLatency(elapsed_ms);
last_op.set(std::make_unique<LastOp>(LastOp{ last_op.set(std::make_unique<LastOp>(LastOp{
.name = Coordination::toString(response->getOpNum()), .name = Coordination::toString(response->getOpNum()),

View File

@ -293,6 +293,16 @@ def test_cmd_conf(started_cluster):
assert result["configuration_change_tries_count"] == "20" assert result["configuration_change_tries_count"] == "20"
assert result["async_replication"] == "true" assert result["async_replication"] == "true"
assert result["latest_logs_cache_size_threshold"] == "1073741824"
assert result["commit_logs_cache_size_threshold"] == "524288000"
assert result["disk_move_retries_wait_ms"] == "1000"
assert result["disk_move_retries_during_init"] == "100"
assert result["log_slow_total_threshold_ms"] == "5000"
assert result["log_slow_cpu_threshold_ms"] == "100"
assert result["log_slow_connection_operation_threshold_ms"] == "1000"
finally: finally:
close_keeper_socket(client) close_keeper_socket(client)