Merge branch 'report-running-ddl-worker-hosts' into support-ddl-on-cluster-stop-waiting-offline-hosts

This commit is contained in:
Tuan Pham Anh 2024-09-17 07:39:24 +00:00
commit 5d289505f7
3 changed files with 146 additions and 35 deletions

View File

@ -40,7 +40,7 @@ public:
UInt64 getCurrentInitializationDurationMs() const;
private:
bool initializeMainThread() override;
void initializeReplication();
void initializeReplication() override;
void initializeLogPointer(const String & processed_entry_name);
DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper) override;

View File

@ -1,48 +1,47 @@
#include <filesystem>
#include <Interpreters/DDLWorker.h>
#include <Core/ServerUUID.h>
#include <Core/Settings.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Interpreters/executeQuery.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ParserQuery.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <Storages/IStorage.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/setThreadName.h>
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/ZooKeeperLock.h>
#include <Common/isLocalAddress.h>
#include <Core/ServerUUID.h>
#include <Core/Settings.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Poco/Timestamp.h>
#include <base/sleep.h>
#include <base/getFQDNOrHostName.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/ThreadPool.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperLock.h>
#include <Common/isLocalAddress.h>
#include <Common/logger_useful.h>
#include <Common/randomSeed.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <base/getFQDNOrHostName.h>
#include <base/sleep.h>
#include <base/sort.h>
#include <memory>
#include <random>
#include <pcg_random.hpp>
#include <Common/scope_guard_safe.h>
#include <Common/ThreadPool.h>
#include <Interpreters/ZooKeeperLog.h>
namespace fs = std::filesystem;
namespace CurrentMetrics
{
extern const Metric DDLWorkerThreads;
@ -102,6 +101,12 @@ DDLWorker::DDLWorker(
if (queue_dir.back() == '/')
queue_dir.resize(queue_dir.size() - 1);
// replicas_dir is at the same level as queue_dir
// E.g:
// queue_dir: /clickhouse/task_queue/ddl
// replicas_dir: /clickhouse/task_queue/replicas
replicas_dir = fs::path(queue_dir).parent_path() / "replicas";
if (config)
{
task_max_lifetime = config->getUInt64(prefix + ".task_max_lifetime", static_cast<UInt64>(task_max_lifetime));
@ -1059,6 +1064,11 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
String query_path_prefix = fs::path(queue_dir) / "query-";
zookeeper->createAncestors(query_path_prefix);
NameSet host_ids;
for (const HostID & host : entry.hosts)
host_ids.emplace(host.toString());
createReplicaDirs(zookeeper, host_ids);
String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential);
if (max_pushed_entry_metric)
{
@ -1098,6 +1108,7 @@ bool DDLWorker::initializeMainThread()
{
auto zookeeper = getAndSetZooKeeper();
zookeeper->createAncestors(fs::path(queue_dir) / "");
initializeReplication();
initialized = true;
return true;
}
@ -1159,6 +1170,7 @@ void DDLWorker::runMainThread()
}
cleanup_event->set();
markReplicasActive(reinitialized);
scheduleTasks(reinitialized);
subsequent_errors_count = 0;
@ -1216,6 +1228,97 @@ void DDLWorker::runMainThread()
}
void DDLWorker::initializeReplication()
{
auto zookeeper = getAndSetZooKeeper();
zookeeper->createAncestors(replicas_dir / "");
NameSet host_id_set;
for (const auto & it : context->getClusters())
{
auto cluster = it.second;
for (const auto & host_ids : cluster->getHostIDs())
for (const auto & host_id : host_ids)
host_id_set.emplace(host_id);
}
createReplicaDirs(zookeeper, host_id_set);
}
void DDLWorker::createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids)
{
for (const auto & host_id : host_ids)
zookeeper->createAncestors(replicas_dir / host_id / "");
}
void DDLWorker::markReplicasActive(bool reinitialized)
{
auto zookeeper = getAndSetZooKeeper();
if (reinitialized)
{
// Reset all active_node_holders
for (auto & it : active_node_holders)
{
auto & active_node_holder = it.second.second;
if (active_node_holder)
active_node_holder->setAlreadyRemoved();
active_node_holder.reset();
}
active_node_holders.clear();
}
const auto maybe_secure_port = context->getTCPPortSecure();
const auto port = context->getTCPPort();
Coordination::Stat replicas_stat;
Strings host_ids = zookeeper->getChildren(replicas_dir, &replicas_stat);
NameSet local_host_ids;
for (const auto & host_id : host_ids)
{
if (active_node_holders.contains(host_id))
continue;
try
{
HostID host = HostID::fromString(host_id);
/// The port is considered local if it matches TCP or TCP secure port that the server is listening.
bool is_local_host = (maybe_secure_port && host.isLocalAddress(*maybe_secure_port)) || host.isLocalAddress(port);
if (is_local_host)
local_host_ids.emplace(host_id);
}
catch (const Exception & e)
{
LOG_WARNING(log, "Unable to check if host {} is a local address, exception: {}", host_id, e.displayText());
continue;
}
}
for (const auto & host_id : local_host_ids)
{
auto it = active_node_holders.find(host_id);
if (it != active_node_holders.end())
{
continue;
}
/// Create "active" node (remove previous one if necessary)
String active_path = replicas_dir / host_id / "active";
String active_id = toString(ServerUUID::get());
zookeeper->deleteEphemeralNodeIfContentMatches(active_path, active_id);
LOG_TRACE(log, "Trying to mark a replica active: active_path={}, active_id={}", active_path, active_id);
zookeeper->create(active_path, active_id, zkutil::CreateMode::Ephemeral);
auto active_node_holder_zookeeper = zookeeper;
auto active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper);
active_node_holders[host_id] = {active_node_holder_zookeeper, active_node_holder};
}
}
void DDLWorker::runCleanupThread()
{
setThreadName("DDLWorkerClnr");

View File

@ -1,22 +1,23 @@
#pragma once
#include <Common/CurrentThread.h>
#include <Interpreters/Context.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Storages/IStorage_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <Interpreters/Context.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <filesystem>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include <unordered_set>
namespace fs = std::filesystem;
namespace zkutil
{
class ZooKeeper;
@ -146,6 +147,10 @@ protected:
/// Return false if the worker was stopped (stop_flag = true)
virtual bool initializeMainThread();
virtual void initializeReplication();
void createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids);
void markReplicasActive(bool reinitialized);
void runMainThread();
void runCleanupThread();
@ -158,6 +163,7 @@ protected:
std::string host_fqdn; /// current host domain name
std::string host_fqdn_id; /// host_name:port
std::string queue_dir; /// dir with queue of queries
fs::path replicas_dir;
mutable std::mutex zookeeper_mutex;
ZooKeeperPtr current_zookeeper TSA_GUARDED_BY(zookeeper_mutex);
@ -199,6 +205,8 @@ protected:
const CurrentMetrics::Metric * max_entry_metric;
const CurrentMetrics::Metric * max_pushed_entry_metric;
std::unordered_map<String, std::pair<ZooKeeperPtr, zkutil::EphemeralNodeHolderPtr>> active_node_holders;
};