mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
Merge pull request #27174 from fuwhu/feature/add-metric-MaxPushedDDLEntryID
Add metric MaxPushedDDLEntryID
This commit is contained in:
commit
918a69e70b
@ -126,6 +126,7 @@ namespace CurrentMetrics
|
||||
extern const Metric VersionInteger;
|
||||
extern const Metric MemoryTracking;
|
||||
extern const Metric MaxDDLEntryID;
|
||||
extern const Metric MaxPushedDDLEntryID;
|
||||
}
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
@ -1468,7 +1469,8 @@ if (ThreadFuzzer::instance().isEffective())
|
||||
if (pool_size < 1)
|
||||
throw Exception("distributed_ddl.pool_size should be greater then 0", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||
global_context->setDDLWorker(std::make_unique<DDLWorker>(pool_size, ddl_zookeeper_path, global_context, &config(),
|
||||
"distributed_ddl", "DDLWorker", &CurrentMetrics::MaxDDLEntryID));
|
||||
"distributed_ddl", "DDLWorker",
|
||||
&CurrentMetrics::MaxDDLEntryID, &CurrentMetrics::MaxPushedDDLEntryID));
|
||||
}
|
||||
|
||||
for (auto & server : *servers)
|
||||
|
@ -60,6 +60,7 @@
|
||||
M(BrokenDistributedFilesToInsert, "Number of files for asynchronous insertion into Distributed tables that has been marked as broken. This metric will starts from 0 on start. Number of files for every shard is summed.") \
|
||||
M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \
|
||||
M(MaxDDLEntryID, "Max processed DDL entry of DDLWorker.") \
|
||||
M(MaxPushedDDLEntryID, "Max DDL entry of DDLWorker that pushed to zookeeper.") \
|
||||
M(PartsTemporary, "The part is generating now, it is not in data_parts list.") \
|
||||
M(PartsPreCommitted, "The part is in data_parts, but not used for SELECTs.") \
|
||||
M(PartsCommitted, "Active data part, used by current and upcoming SELECTs.") \
|
||||
|
@ -158,15 +158,20 @@ DDLWorker::DDLWorker(
|
||||
const Poco::Util::AbstractConfiguration * config,
|
||||
const String & prefix,
|
||||
const String & logger_name,
|
||||
const CurrentMetrics::Metric * max_entry_metric_)
|
||||
const CurrentMetrics::Metric * max_entry_metric_,
|
||||
const CurrentMetrics::Metric * max_pushed_entry_metric_)
|
||||
: context(Context::createCopy(context_))
|
||||
, log(&Poco::Logger::get(logger_name))
|
||||
, pool_size(pool_size_)
|
||||
, max_entry_metric(max_entry_metric_)
|
||||
, max_pushed_entry_metric(max_pushed_entry_metric_)
|
||||
{
|
||||
if (max_entry_metric)
|
||||
CurrentMetrics::set(*max_entry_metric, 0);
|
||||
|
||||
if (max_pushed_entry_metric)
|
||||
CurrentMetrics::set(*max_pushed_entry_metric, 0);
|
||||
|
||||
if (1 < pool_size)
|
||||
{
|
||||
LOG_WARNING(log, "DDLWorker is configured to use multiple threads. "
|
||||
@ -1046,6 +1051,15 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
|
||||
zookeeper->createAncestors(query_path_prefix);
|
||||
|
||||
String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential);
|
||||
if (max_pushed_entry_metric)
|
||||
{
|
||||
String str_buf = node_path.substr(query_path_prefix.length());
|
||||
DB::ReadBufferFromString in(str_buf);
|
||||
CurrentMetrics::Metric id;
|
||||
readText(id, in);
|
||||
id = std::max(*max_pushed_entry_metric, id);
|
||||
CurrentMetrics::set(*max_pushed_entry_metric, id);
|
||||
}
|
||||
|
||||
/// We cannot create status dirs in a single transaction with previous request,
|
||||
/// because we don't know node_path until previous request is executed.
|
||||
|
@ -44,7 +44,7 @@ class DDLWorker
|
||||
{
|
||||
public:
|
||||
DDLWorker(int pool_size_, const std::string & zk_root_dir, ContextPtr context_, const Poco::Util::AbstractConfiguration * config, const String & prefix,
|
||||
const String & logger_name = "DDLWorker", const CurrentMetrics::Metric * max_entry_metric_ = nullptr);
|
||||
const String & logger_name = "DDLWorker", const CurrentMetrics::Metric * max_entry_metric_ = nullptr, const CurrentMetrics::Metric * max_pushed_entry_metric_ = nullptr);
|
||||
virtual ~DDLWorker();
|
||||
|
||||
/// Pushes query into DDL queue, returns path to created node
|
||||
@ -148,6 +148,7 @@ protected:
|
||||
|
||||
std::atomic<UInt64> max_id = 0;
|
||||
const CurrentMetrics::Metric * max_entry_metric;
|
||||
const CurrentMetrics::Metric * max_pushed_entry_metric;
|
||||
};
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user