Synchronize metrics and Keeper

This commit is contained in:
Alexey Milovidov 2024-02-28 23:43:03 +01:00
parent a02d78426a
commit 763bd22725
17 changed files with 139 additions and 9 deletions

View File

@ -264,7 +264,17 @@
M(RefreshingViews, "Number of materialized views currently executing a refresh") \
M(StorageBufferFlushThreads, "Number of threads for background flushes in StorageBuffer") \
M(StorageBufferFlushThreadsActive, "Number of threads for background flushes in StorageBuffer running a task") \
M(StorageBufferFlushThreadsScheduled, "Number of queued or active threads for background flushes in StorageBuffer")
M(StorageBufferFlushThreadsScheduled, "Number of queued or active threads for background flushes in StorageBuffer") \
M(SharedMergeTreeThreads, "Number of threads in the thread pools in internals of SharedMergeTree") \
M(SharedMergeTreeThreadsActive, "Number of threads in the thread pools in internals of SharedMergeTree running a task") \
M(SharedMergeTreeThreadsScheduled, "Number of queued or active threads in the thread pools in internals of SharedMergeTree") \
M(SharedMergeTreeFetch, "Number of fetches in progress") \
M(CacheWarmerBytesInProgress, "Total size of remote file segments waiting to be asynchronously loaded into filesystem cache.") \
M(DistrCacheOpenedConnections, "Number of open connections to Distributed Cache") \
M(DistrCacheUsedConnections, "Number of currently used connections to Distributed Cache") \
M(DistrCacheReadRequests, "Number of executed Read requests to Distributed Cache") \
M(DistrCacheWriteRequests, "Number of executed Write requests to Distributed Cache") \
M(DistrCacheServerConnections, "Number of open connections to ClickHouse server from Distributed Cache")
#ifdef APPLY_FOR_EXTERNAL_METRICS
#define APPLY_FOR_METRICS(M) APPLY_FOR_BUILTIN_METRICS(M) APPLY_FOR_EXTERNAL_METRICS(M)

View File

@ -585,6 +585,10 @@
M(703, INVALID_IDENTIFIER) \
M(704, QUERY_CACHE_USED_WITH_NONDETERMINISTIC_FUNCTIONS) \
M(705, TABLE_NOT_EMPTY) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \
\
M(706, LIBSSH_ERROR) \
M(707, GCP_ERROR) \
M(708, ILLEGAL_STATISTIC) \

View File

@ -39,6 +39,14 @@ static struct InitFiu
REGULAR(replicated_merge_tree_commit_zk_fail_when_recovering_from_hw_fault) \
REGULAR(use_delayed_remote_source) \
REGULAR(cluster_discovery_faults) \
ONCE(smt_commit_merge_mutate_zk_fail_after_op) \
ONCE(smt_commit_merge_mutate_zk_fail_before_op) \
ONCE(smt_commit_write_zk_fail_after_op) \
ONCE(smt_commit_write_zk_fail_before_op) \
ONCE(smt_commit_merge_change_version_before_op) \
ONCE(smt_merge_mutate_intention_freeze_in_destructor) \
ONCE(meta_in_keeper_create_metadata_failure) \
REGULAR(cache_warmer_stall) \
REGULAR(check_table_query_delay_for_part) \
REGULAR(dummy_failpoint) \
REGULAR(prefetched_reader_pool_failpoint) \

View File

@ -92,6 +92,8 @@
M(LocalWriteThrottlerBytes, "Bytes passed through 'max_local_write_bandwidth_for_server'/'max_local_write_bandwidth' throttler.") \
M(LocalWriteThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_local_write_bandwidth_for_server'/'max_local_write_bandwidth' throttling.") \
M(ThrottlerSleepMicroseconds, "Total time a query was sleeping to conform all throttling settings.") \
M(PartsWithAppliedMutationsOnFly, "Total number of parts for which there was any mutation applied on fly") \
M(MutationsAppliedOnFlyInAllParts, "The sum of number of applied mutations on-fly for part among all read parts") \
\
M(QueryMaskingRulesMatch, "Number of times query masking rules was successfully matched.") \
\
@ -311,6 +313,12 @@ The server successfully detected this situation and will download merged part fr
M(ParallelReplicasProcessingPartsMicroseconds, "Time spent processing data parts") \
M(ParallelReplicasStealingLeftoversMicroseconds, "Time spent collecting orphaned segments") \
M(ParallelReplicasCollectingOwnedSegmentsMicroseconds, "Time spent collecting segments meant by hash") \
M(ParallelReplicasNumRequests, "Number of requests to the initiator.") \
M(ParallelReplicasDeniedRequests, "Number of completely denied requests to the initiator") \
M(CacheWarmerBytesDownloaded, "Amount of data fetched into filesystem cache by dedicated background threads.") \
M(CacheWarmerDataPartsDownloaded, "Number of data parts that were fully fetched by CacheWarmer.") \
M(IgnoredColdParts, "See setting ignore_cold_parts_seconds. Number of times read queries ignored very new parts that weren't pulled into cache by CacheWarmer yet.") \
M(PreferredWarmedUnmergedParts, "See setting prefer_warmed_unmerged_parts_seconds. Number of times read queries used outdated pre-merge parts that are in cache instead of merged part that wasn't pulled into cache by CacheWarmer yet.") \
\
M(PerfCPUCycles, "Total cycles. Be wary of what happens during CPU frequency scaling.") \
M(PerfInstructions, "Retired instructions. Be careful, these can be affected by various issues, most notably hardware interrupt counts.") \
@ -516,6 +524,21 @@ The server successfully detected this situation and will download merged part fr
M(AggregationPreallocatedElementsInHashTables, "How many elements were preallocated in hash tables for aggregation.") \
M(AggregationHashTablesInitializedAsTwoLevel, "How many hash tables were inited as two-level for aggregation.") \
\
M(MetadataFromKeeperCacheHit, "Number of times an object storage metadata request was answered from cache without making request to Keeper") \
M(MetadataFromKeeperCacheMiss, "Number of times an object storage metadata request had to be answered from Keeper") \
M(MetadataFromKeeperCacheUpdateMicroseconds, "Total time spent in updating the cache including waiting for responses from Keeper") \
M(MetadataFromKeeperUpdateCacheOneLevel, "Number of times a cache update for one level of directory tree was done") \
M(MetadataFromKeeperTransactionCommit, "Number of times metadata transaction commit was attempted") \
M(MetadataFromKeeperTransactionCommitRetry, "Number of times metadata transaction commit was retried") \
M(MetadataFromKeeperCleanupTransactionCommit, "Number of times metadata transaction commit for deleted objects cleanup was attempted") \
M(MetadataFromKeeperCleanupTransactionCommitRetry, "Number of times metadata transaction commit for deleted objects cleanup was retried") \
M(MetadataFromKeeperOperations, "Number of times a request was made to Keeper") \
M(MetadataFromKeeperIndividualOperations, "Number of paths read or written by single or multi requests to Keeper") \
M(MetadataFromKeeperReconnects, "Number of times a reconnect to Keeper was done") \
M(MetadataFromKeeperBackgroundCleanupObjects, "Number of times a old deleted object clean up was performed by background task") \
M(MetadataFromKeeperBackgroundCleanupTransactions, "Number of times old transaction idempotency token was cleaned up by background task") \
M(MetadataFromKeeperBackgroundCleanupErrors, "Number of times an error was encountered in background cleanup task") \
\
M(KafkaRebalanceRevocations, "Number of partition revocations (the first stage of consumer group rebalance)") \
M(KafkaRebalanceAssignments, "Number of partition assignments (the final stage of consumer group rebalance)") \
M(KafkaRebalanceErrors, "Number of failed consumer group rebalances") \
@ -607,9 +630,32 @@ The server successfully detected this situation and will download merged part fr
M(MergeTreeAllRangesAnnouncementsSentElapsedMicroseconds, "Time spent in sending the announcement from the remote server to the initiator server about the set of data parts (for MergeTree tables). Measured on the remote server side.") \
\
M(ConnectionPoolIsFullMicroseconds, "Total time spent waiting for a slot in connection pool.") \
\
M(AsyncLoaderWaitMicroseconds, "Total time a query was waiting for async loader jobs.") \
\
M(DistrCacheServerSwitches, "Number of server switches between distributed cache servers in read/write-through cache") \
M(DistrCacheReadMicroseconds, "Time spent reading from distributed cache") \
M(DistrCacheFallbackReadMicroseconds, "Time spend reading from fallback buffer instead of distribted cache") \
M(DistrCachePrecomputeRangesMicroseconds, "Time spent to precompute read ranges") \
M(DistrCacheNextImplMicroseconds, "Time spend in ReadBufferFromDistributedCache::nextImpl") \
M(DistrCacheOpenedConnections, "The number of open connections to distributed cache") \
M(DistrCacheReusedConnections, "The number of reused connections to distributed cache") \
M(DistrCacheHoldConnections, "The number of used connections to distributed cache") \
\
M(DistrCacheGetResponseMicroseconds, "Time spend to wait for response from distributed cache") \
M(DistrCacheStartRangeMicroseconds, "Time spent to start a new read range with distributed cache") \
M(DistrCacheLockRegistryMicroseconds, "Time spent to take DistributedCacheRegistry lock") \
M(DistrCacheUnusedPackets, "Number of skipped unused packets from distributed cache") \
M(DistrCachePackets, "Total number of packets received from distributed cache") \
M(DistrCacheUnusedPacketsBytes, "The number of bytes in Data packets which were ignored") \
M(DistrCacheRegistryUpdateMicroseconds, "Time spent updating distributed cache registry") \
M(DistrCacheRegistryUpdates, "Number of distributed cache registry updates") \
\
M(DistrCacheConnectMicroseconds, "The time spent to connect to distributed cache") \
M(DistrCacheConnectAttempts, "The number of connection attempts to distributed cache") \
M(DistrCacheGetClient, "Number of client access times") \
\
M(DistrCacheServerProcessRequestMicroseconds, "Time spent processing request on DistributedCache server side") \
\
M(LogTest, "Number of log messages with level Test") \
M(LogTrace, "Number of log messages with level Trace") \
M(LogDebug, "Number of log messages with level Debug") \

View File

@ -10,6 +10,7 @@
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/DistributedCacheLog.h>
#include <Interpreters/S3QueueLog.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <Interpreters/ProcessorsProfileLog.h>

View File

@ -29,6 +29,7 @@
M(TextLogElement) \
M(S3QueueLogElement) \
M(FilesystemCacheLogElement) \
M(DistributedCacheLogElement) \
M(FilesystemReadPrefetchesLogElement) \
M(AsynchronousInsertLogElement) \
M(BackupLogElement) \

View File

@ -196,8 +196,9 @@ bool ThreadStatus::isQueryCanceled() const
if (!thread_group)
return false;
chassert(local_data.query_is_canceled_predicate);
return local_data.query_is_canceled_predicate();
if (local_data.query_is_canceled_predicate)
return local_data.query_is_canceled_predicate();
return false;
}
ThreadStatus::~ThreadStatus()

View File

@ -8,6 +8,7 @@
#include <vector>
#include <memory>
#include <cstdint>
#include <span>
#include <functional>
/** Generic interface for ZooKeeper-like services.
@ -622,6 +623,10 @@ public:
int32_t version,
ReconfigCallback callback) = 0;
virtual void multi(
std::span<const RequestPtr> requests,
MultiCallback callback) = 0;
virtual void multi(
const Requests & requests,
MultiCallback callback) = 0;

View File

@ -157,6 +157,10 @@ struct TestKeeperReconfigRequest final : ReconfigRequest, TestKeeperRequest
struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
{
explicit TestKeeperMultiRequest(const Requests & generic_requests)
: TestKeeperMultiRequest(std::span(generic_requests))
{}
explicit TestKeeperMultiRequest(std::span<const RequestPtr> generic_requests)
{
requests.reserve(generic_requests.size());
@ -883,6 +887,13 @@ void TestKeeper::reconfig(
void TestKeeper::multi(
const Requests & requests,
MultiCallback callback)
{
return multi(std::span(requests), std::move(callback));
}
void TestKeeper::multi(
std::span<const RequestPtr> requests,
MultiCallback callback)
{
TestKeeperMultiRequest request(requests);

View File

@ -101,6 +101,10 @@ public:
const Requests & requests,
MultiCallback callback) override;
void multi(
std::span<const RequestPtr> requests,
MultiCallback callback) override;
void finalize(const String & reason) override;
bool isFeatureEnabled(DB::KeeperFeatureFlag) const override

View File

@ -1266,6 +1266,11 @@ std::future<Coordination::RemoveResponse> ZooKeeper::asyncTryRemoveNoThrow(const
}
std::future<Coordination::MultiResponse> ZooKeeper::asyncTryMultiNoThrow(const Coordination::Requests & ops)
{
return asyncTryMultiNoThrow(std::span(ops));
}
std::future<Coordination::MultiResponse> ZooKeeper::asyncTryMultiNoThrow(std::span<const Coordination::RequestPtr> ops)
{
auto promise = std::make_shared<std::promise<Coordination::MultiResponse>>();
auto future = promise->get_future();

View File

@ -550,6 +550,7 @@ public:
FutureMulti asyncMulti(const Coordination::Requests & ops);
/// Like the previous one but don't throw any exceptions on future.get()
FutureMulti asyncTryMultiNoThrow(const Coordination::Requests & ops);
FutureMulti asyncTryMultiNoThrow(std::span<const Coordination::RequestPtr> ops);
using FutureSync = std::future<Coordination::SyncResponse>;
FutureSync asyncSync(const std::string & path);

View File

@ -156,6 +156,12 @@ std::string ZooKeeperAuthRequest::toStringImpl() const
void ZooKeeperCreateRequest::writeImpl(WriteBuffer & out) const
{
/// See https://github.com/ClickHouse/clickhouse-private/issues/3029
if (path.starts_with("/clickhouse/tables/") && path.find("/parts/") != std::string::npos)
{
LOG_TRACE(getLogger(__PRETTY_FUNCTION__), "Creating part at path {}", path);
}
Coordination::write(path, out);
Coordination::write(data, out);
Coordination::write(acls, out);
@ -480,6 +486,10 @@ OpNum ZooKeeperMultiRequest::getOpNum() const
}
ZooKeeperMultiRequest::ZooKeeperMultiRequest(const Requests & generic_requests, const ACLs & default_acls)
: ZooKeeperMultiRequest(std::span{generic_requests}, default_acls)
{}
ZooKeeperMultiRequest::ZooKeeperMultiRequest(std::span<const Coordination::RequestPtr> generic_requests, const ACLs & default_acls)
{
/// Convert nested Requests to ZooKeeperRequests.
/// Note that deep copy is required to avoid modifying path in presence of chroot prefix.

View File

@ -7,17 +7,13 @@
#include <boost/noncopyable.hpp>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <map>
#include <unordered_map>
#include <mutex>
#include <chrono>
#include <vector>
#include <memory>
#include <thread>
#include <atomic>
#include <cstdint>
#include <optional>
#include <functional>
#include <span>
namespace Coordination
@ -516,6 +512,7 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
ZooKeeperMultiRequest() = default;
ZooKeeperMultiRequest(const Requests & generic_requests, const ACLs & default_acls);
ZooKeeperMultiRequest(std::span<const Coordination::RequestPtr> generic_requests, const ACLs & default_acls);
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;

View File

@ -1454,6 +1454,13 @@ void ZooKeeper::reconfig(
void ZooKeeper::multi(
const Requests & requests,
MultiCallback callback)
{
multi(std::span(requests), std::move(callback));
}
void ZooKeeper::multi(
std::span<const RequestPtr> requests,
MultiCallback callback)
{
ZooKeeperMultiRequest request(requests, default_acls);

View File

@ -194,6 +194,10 @@ public:
int32_t version,
ReconfigCallback callback) final;
void multi(
std::span<const RequestPtr> requests,
MultiCallback callback) override;
void multi(
const Requests & requests,
MultiCallback callback) override;

View File

@ -147,6 +147,11 @@ public:
user_error = UserError{};
}
void setKeeperError(const zkutil::KeeperException & exception)
{
setKeeperError(std::make_exception_ptr(exception), exception.code, exception.message());
}
void stopRetries() { stop_retries = true; }
bool isLastRetry() const { return total_failures >= retries_info.max_retries; }
@ -180,6 +185,12 @@ private:
bool canTry()
{
if (unconditional_retry)
{
unconditional_retry = false;
return true;
}
if (iteration_succeeded)
{
if (logger && total_failures > 0)
@ -275,6 +286,10 @@ private:
UInt64 current_iteration = 0;
UInt64 current_backoff_ms = 0;
public:
/// This is used in SharedMergeTree
bool unconditional_retry = false;
};
}