From 8515f3b3a2b99b70e82e7e00ed4d5d5323edc8d7 Mon Sep 17 00:00:00 2001 From: fuwhu Date: Wed, 11 Aug 2021 11:40:06 +0800 Subject: [PATCH] Add metric MaxPushedDDLEntryID. --- programs/server/Server.cpp | 4 +++- src/Common/CurrentMetrics.cpp | 1 + src/Interpreters/DDLWorker.cpp | 16 +++++++++++++++- src/Interpreters/DDLWorker.h | 3 ++- 4 files changed, 21 insertions(+), 3 deletions(-) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 86bb04351b1..c69c48bb23d 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -126,6 +126,7 @@ namespace CurrentMetrics extern const Metric VersionInteger; extern const Metric MemoryTracking; extern const Metric MaxDDLEntryID; + extern const Metric MaxPushedDDLEntryID; } namespace fs = std::filesystem; @@ -1468,7 +1469,8 @@ if (ThreadFuzzer::instance().isEffective()) if (pool_size < 1) throw Exception("distributed_ddl.pool_size should be greater then 0", ErrorCodes::ARGUMENT_OUT_OF_BOUND); global_context->setDDLWorker(std::make_unique(pool_size, ddl_zookeeper_path, global_context, &config(), - "distributed_ddl", "DDLWorker", &CurrentMetrics::MaxDDLEntryID)); + "distributed_ddl", "DDLWorker", + &CurrentMetrics::MaxDDLEntryID, &CurrentMetrics::MaxPushedDDLEntryID)); } for (auto & server : *servers) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index f94c3421107..9acefe8a2d8 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -60,6 +60,7 @@ M(BrokenDistributedFilesToInsert, "Number of files for asynchronous insertion into Distributed tables that has been marked as broken. This metric will starts from 0 on start. Number of files for every shard is summed.") \ M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \ M(MaxDDLEntryID, "Max processed DDL entry of DDLWorker.") \ + M(MaxPushedDDLEntryID, "Max DDL entry of DDLWorker that pushed to zookeeper.") \ M(PartsTemporary, "The part is generating now, it is not in data_parts list.") \ M(PartsPreCommitted, "The part is in data_parts, but not used for SELECTs.") \ M(PartsCommitted, "Active data part, used by current and upcoming SELECTs.") \ diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 47ca2b72db8..c00f62f5133 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -158,15 +158,20 @@ DDLWorker::DDLWorker( const Poco::Util::AbstractConfiguration * config, const String & prefix, const String & logger_name, - const CurrentMetrics::Metric * max_entry_metric_) + const CurrentMetrics::Metric * max_entry_metric_, + const CurrentMetrics::Metric * max_pushed_entry_metric_) : context(Context::createCopy(context_)) , log(&Poco::Logger::get(logger_name)) , pool_size(pool_size_) , max_entry_metric(max_entry_metric_) + , max_pushed_entry_metric(max_pushed_entry_metric_) { if (max_entry_metric) CurrentMetrics::set(*max_entry_metric, 0); + if (max_pushed_entry_metric) + CurrentMetrics::set(*max_pushed_entry_metric, 0); + if (1 < pool_size) { LOG_WARNING(log, "DDLWorker is configured to use multiple threads. " @@ -1046,6 +1051,15 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry) zookeeper->createAncestors(query_path_prefix); String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential); + if (max_pushed_entry_metric) + { + String str_buf = node_path.substr(query_path_prefix.length()); + DB::ReadBufferFromString in(str_buf); + CurrentMetrics::Metric id; + readText(id, in); + id = std::max(*max_pushed_entry_metric, id); + CurrentMetrics::set(*max_pushed_entry_metric, id); + } /// We cannot create status dirs in a single transaction with previous request, /// because we don't know node_path until previous request is executed. diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index d05b9b27611..d2b7c9d169d 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -44,7 +44,7 @@ class DDLWorker { public: DDLWorker(int pool_size_, const std::string & zk_root_dir, ContextPtr context_, const Poco::Util::AbstractConfiguration * config, const String & prefix, - const String & logger_name = "DDLWorker", const CurrentMetrics::Metric * max_entry_metric_ = nullptr); + const String & logger_name = "DDLWorker", const CurrentMetrics::Metric * max_entry_metric_ = nullptr, const CurrentMetrics::Metric * max_pushed_entry_metric_ = nullptr); virtual ~DDLWorker(); /// Pushes query into DDL queue, returns path to created node @@ -148,6 +148,7 @@ protected: std::atomic max_id = 0; const CurrentMetrics::Metric * max_entry_metric; + const CurrentMetrics::Metric * max_pushed_entry_metric; };