Add metric MaxPushedDDLEntryID.

This commit is contained in:
fuwhu 2021-08-11 11:40:06 +08:00
parent d9c9422a44
commit 8515f3b3a2
4 changed files with 21 additions and 3 deletions

View File

@ -126,6 +126,7 @@ namespace CurrentMetrics
extern const Metric VersionInteger;
extern const Metric MemoryTracking;
extern const Metric MaxDDLEntryID;
extern const Metric MaxPushedDDLEntryID;
}
namespace fs = std::filesystem;
@ -1468,7 +1469,8 @@ if (ThreadFuzzer::instance().isEffective())
if (pool_size < 1)
throw Exception("distributed_ddl.pool_size should be greater then 0", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
global_context->setDDLWorker(std::make_unique<DDLWorker>(pool_size, ddl_zookeeper_path, global_context, &config(),
"distributed_ddl", "DDLWorker", &CurrentMetrics::MaxDDLEntryID));
"distributed_ddl", "DDLWorker",
&CurrentMetrics::MaxDDLEntryID, &CurrentMetrics::MaxPushedDDLEntryID));
}
for (auto & server : *servers)

View File

@ -60,6 +60,7 @@
M(BrokenDistributedFilesToInsert, "Number of files for asynchronous insertion into Distributed tables that has been marked as broken. This metric will starts from 0 on start. Number of files for every shard is summed.") \
M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \
M(MaxDDLEntryID, "Max processed DDL entry of DDLWorker.") \
M(MaxPushedDDLEntryID, "Max DDL entry of DDLWorker that pushed to zookeeper.") \
M(PartsTemporary, "The part is generating now, it is not in data_parts list.") \
M(PartsPreCommitted, "The part is in data_parts, but not used for SELECTs.") \
M(PartsCommitted, "Active data part, used by current and upcoming SELECTs.") \

View File

@ -158,15 +158,20 @@ DDLWorker::DDLWorker(
const Poco::Util::AbstractConfiguration * config,
const String & prefix,
const String & logger_name,
const CurrentMetrics::Metric * max_entry_metric_)
const CurrentMetrics::Metric * max_entry_metric_,
const CurrentMetrics::Metric * max_pushed_entry_metric_)
: context(Context::createCopy(context_))
, log(&Poco::Logger::get(logger_name))
, pool_size(pool_size_)
, max_entry_metric(max_entry_metric_)
, max_pushed_entry_metric(max_pushed_entry_metric_)
{
if (max_entry_metric)
CurrentMetrics::set(*max_entry_metric, 0);
if (max_pushed_entry_metric)
CurrentMetrics::set(*max_pushed_entry_metric, 0);
if (1 < pool_size)
{
LOG_WARNING(log, "DDLWorker is configured to use multiple threads. "
@ -1046,6 +1051,15 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
zookeeper->createAncestors(query_path_prefix);
String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential);
if (max_pushed_entry_metric)
{
String str_buf = node_path.substr(query_path_prefix.length());
DB::ReadBufferFromString in(str_buf);
CurrentMetrics::Metric id;
readText(id, in);
id = std::max(*max_pushed_entry_metric, id);
CurrentMetrics::set(*max_pushed_entry_metric, id);
}
/// We cannot create status dirs in a single transaction with previous request,
/// because we don't know node_path until previous request is executed.

View File

@ -44,7 +44,7 @@ class DDLWorker
{
public:
DDLWorker(int pool_size_, const std::string & zk_root_dir, ContextPtr context_, const Poco::Util::AbstractConfiguration * config, const String & prefix,
const String & logger_name = "DDLWorker", const CurrentMetrics::Metric * max_entry_metric_ = nullptr);
const String & logger_name = "DDLWorker", const CurrentMetrics::Metric * max_entry_metric_ = nullptr, const CurrentMetrics::Metric * max_pushed_entry_metric_ = nullptr);
virtual ~DDLWorker();
/// Pushes query into DDL queue, returns path to created node
@ -148,6 +148,7 @@ protected:
std::atomic<UInt64> max_id = 0;
const CurrentMetrics::Metric * max_entry_metric;
const CurrentMetrics::Metric * max_pushed_entry_metric;
};