This commit is contained in:
tuanpach 2024-09-19 03:56:50 +04:00 committed by GitHub
commit 7389ec4f0d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 243 additions and 35 deletions

View File

@ -40,7 +40,7 @@ public:
UInt64 getCurrentInitializationDurationMs() const; UInt64 getCurrentInitializationDurationMs() const;
private: private:
bool initializeMainThread() override; bool initializeMainThread() override;
void initializeReplication(); void initializeReplication() override;
void initializeLogPointer(const String & processed_entry_name); void initializeLogPointer(const String & processed_entry_name);
DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper) override; DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper) override;

View File

@ -1,48 +1,47 @@
#include <filesystem>
#include <Interpreters/DDLWorker.h> #include <Core/ServerUUID.h>
#include <Core/Settings.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLTask.h> #include <Interpreters/DDLTask.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Interpreters/executeQuery.h>
#include <Parsers/ASTAlterQuery.h> #include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ASTDropQuery.h> #include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTOptimizeQuery.h> #include <Parsers/ASTOptimizeQuery.h>
#include <Parsers/ASTQueryWithOnCluster.h> #include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ASTQueryWithTableAndOutput.h> #include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/ASTCreateIndexQuery.h>
#include <Parsers/ASTDropIndexQuery.h>
#include <Parsers/ParserQuery.h> #include <Parsers/ParserQuery.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Common/OpenTelemetryTraceContext.h>
#include <Common/setThreadName.h>
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/ZooKeeperLock.h>
#include <Common/isLocalAddress.h>
#include <Core/ServerUUID.h>
#include <Core/Settings.h>
#include <Storages/StorageReplicatedMergeTree.h> #include <Storages/StorageReplicatedMergeTree.h>
#include <Poco/Timestamp.h> #include <Poco/Timestamp.h>
#include <base/sleep.h> #include <Common/OpenTelemetryTraceContext.h>
#include <base/getFQDNOrHostName.h> #include <Common/ThreadPool.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperLock.h>
#include <Common/isLocalAddress.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/randomSeed.h>
#include <Common/scope_guard_safe.h>
#include <Common/setThreadName.h>
#include <base/getFQDNOrHostName.h>
#include <base/sleep.h>
#include <base/sort.h> #include <base/sort.h>
#include <memory> #include <memory>
#include <random> #include <random>
#include <pcg_random.hpp> #include <pcg_random.hpp>
#include <Common/scope_guard_safe.h>
#include <Common/ThreadPool.h>
#include <Interpreters/ZooKeeperLog.h>
namespace fs = std::filesystem; namespace fs = std::filesystem;
namespace CurrentMetrics namespace CurrentMetrics
{ {
extern const Metric DDLWorkerThreads; extern const Metric DDLWorkerThreads;
@ -102,6 +101,12 @@ DDLWorker::DDLWorker(
if (queue_dir.back() == '/') if (queue_dir.back() == '/')
queue_dir.resize(queue_dir.size() - 1); queue_dir.resize(queue_dir.size() - 1);
// replicas_dir is at the same level as queue_dir
// E.g:
// queue_dir: /clickhouse/task_queue/ddl
// replicas_dir: /clickhouse/task_queue/replicas
replicas_dir = fs::path(queue_dir).parent_path() / "replicas";
if (config) if (config)
{ {
task_max_lifetime = config->getUInt64(prefix + ".task_max_lifetime", static_cast<UInt64>(task_max_lifetime)); task_max_lifetime = config->getUInt64(prefix + ".task_max_lifetime", static_cast<UInt64>(task_max_lifetime));
@ -1059,6 +1064,11 @@ String DDLWorker::enqueueQuery(DDLLogEntry & entry)
String query_path_prefix = fs::path(queue_dir) / "query-"; String query_path_prefix = fs::path(queue_dir) / "query-";
zookeeper->createAncestors(query_path_prefix); zookeeper->createAncestors(query_path_prefix);
NameSet host_ids;
for (const HostID & host : entry.hosts)
host_ids.emplace(host.toString());
createReplicaDirs(zookeeper, host_ids);
String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential); String node_path = zookeeper->create(query_path_prefix, entry.toString(), zkutil::CreateMode::PersistentSequential);
if (max_pushed_entry_metric) if (max_pushed_entry_metric)
{ {
@ -1098,6 +1108,7 @@ bool DDLWorker::initializeMainThread()
{ {
auto zookeeper = getAndSetZooKeeper(); auto zookeeper = getAndSetZooKeeper();
zookeeper->createAncestors(fs::path(queue_dir) / ""); zookeeper->createAncestors(fs::path(queue_dir) / "");
initializeReplication();
initialized = true; initialized = true;
return true; return true;
} }
@ -1159,6 +1170,7 @@ void DDLWorker::runMainThread()
} }
cleanup_event->set(); cleanup_event->set();
markReplicasActive(reinitialized);
scheduleTasks(reinitialized); scheduleTasks(reinitialized);
subsequent_errors_count = 0; subsequent_errors_count = 0;
@ -1216,6 +1228,97 @@ void DDLWorker::runMainThread()
} }
void DDLWorker::initializeReplication()
{
auto zookeeper = getAndSetZooKeeper();
zookeeper->createAncestors(replicas_dir / "");
NameSet host_id_set;
for (const auto & it : context->getClusters())
{
auto cluster = it.second;
for (const auto & host_ids : cluster->getHostIDs())
for (const auto & host_id : host_ids)
host_id_set.emplace(host_id);
}
createReplicaDirs(zookeeper, host_id_set);
}
void DDLWorker::createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids)
{
for (const auto & host_id : host_ids)
zookeeper->createAncestors(replicas_dir / host_id / "");
}
void DDLWorker::markReplicasActive(bool reinitialized)
{
auto zookeeper = getAndSetZooKeeper();
if (reinitialized)
{
// Reset all active_node_holders
for (auto & it : active_node_holders)
{
auto & active_node_holder = it.second.second;
if (active_node_holder)
active_node_holder->setAlreadyRemoved();
active_node_holder.reset();
}
active_node_holders.clear();
}
const auto maybe_secure_port = context->getTCPPortSecure();
const auto port = context->getTCPPort();
Coordination::Stat replicas_stat;
Strings host_ids = zookeeper->getChildren(replicas_dir, &replicas_stat);
NameSet local_host_ids;
for (const auto & host_id : host_ids)
{
if (active_node_holders.contains(host_id))
continue;
try
{
HostID host = HostID::fromString(host_id);
/// The port is considered local if it matches TCP or TCP secure port that the server is listening.
bool is_local_host = (maybe_secure_port && host.isLocalAddress(*maybe_secure_port)) || host.isLocalAddress(port);
if (is_local_host)
local_host_ids.emplace(host_id);
}
catch (const Exception & e)
{
LOG_WARNING(log, "Unable to check if host {} is a local address, exception: {}", host_id, e.displayText());
continue;
}
}
for (const auto & host_id : local_host_ids)
{
auto it = active_node_holders.find(host_id);
if (it != active_node_holders.end())
{
continue;
}
/// Create "active" node (remove previous one if necessary)
String active_path = replicas_dir / host_id / "active";
String active_id = toString(ServerUUID::get());
zookeeper->deleteEphemeralNodeIfContentMatches(active_path, active_id);
LOG_TRACE(log, "Trying to mark a replica active: active_path={}, active_id={}", active_path, active_id);
zookeeper->create(active_path, active_id, zkutil::CreateMode::Ephemeral);
auto active_node_holder_zookeeper = zookeeper;
auto active_node_holder = zkutil::EphemeralNodeHolder::existing(active_path, *active_node_holder_zookeeper);
active_node_holders[host_id] = {active_node_holder_zookeeper, active_node_holder};
}
}
void DDLWorker::runCleanupThread() void DDLWorker::runCleanupThread()
{ {
setThreadName("DDLWorkerClnr"); setThreadName("DDLWorkerClnr");

View File

@ -1,22 +1,23 @@
#pragma once #pragma once
#include <Common/CurrentThread.h> #include <Interpreters/Context.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h> #include <Common/DNSResolver.h>
#include <Common/ThreadPool_fwd.h> #include <Common/ThreadPool_fwd.h>
#include <Common/ZooKeeper/IKeeper.h> #include <Common/ZooKeeper/IKeeper.h>
#include <Storages/IStorage_fwd.h> #include <Common/ZooKeeper/ZooKeeper.h>
#include <Parsers/IAST_fwd.h>
#include <Interpreters/Context.h>
#include <atomic> #include <atomic>
#include <chrono> #include <filesystem>
#include <condition_variable>
#include <mutex> #include <mutex>
#include <shared_mutex> #include <shared_mutex>
#include <thread>
#include <unordered_set> #include <unordered_set>
namespace fs = std::filesystem;
namespace zkutil namespace zkutil
{ {
class ZooKeeper; class ZooKeeper;
@ -146,6 +147,10 @@ protected:
/// Return false if the worker was stopped (stop_flag = true) /// Return false if the worker was stopped (stop_flag = true)
virtual bool initializeMainThread(); virtual bool initializeMainThread();
virtual void initializeReplication();
void createReplicaDirs(const ZooKeeperPtr & zookeeper, const NameSet & host_ids);
void markReplicasActive(bool reinitialized);
void runMainThread(); void runMainThread();
void runCleanupThread(); void runCleanupThread();
@ -157,7 +162,8 @@ protected:
std::string host_fqdn; /// current host domain name std::string host_fqdn; /// current host domain name
std::string host_fqdn_id; /// host_name:port std::string host_fqdn_id; /// host_name:port
std::string queue_dir; /// dir with queue of queries std::string queue_dir; /// dir with queue of queries
fs::path replicas_dir;
mutable std::mutex zookeeper_mutex; mutable std::mutex zookeeper_mutex;
ZooKeeperPtr current_zookeeper TSA_GUARDED_BY(zookeeper_mutex); ZooKeeperPtr current_zookeeper TSA_GUARDED_BY(zookeeper_mutex);
@ -199,6 +205,8 @@ protected:
const CurrentMetrics::Metric * max_entry_metric; const CurrentMetrics::Metric * max_entry_metric;
const CurrentMetrics::Metric * max_pushed_entry_metric; const CurrentMetrics::Metric * max_pushed_entry_metric;
std::unordered_map<String, std::pair<ZooKeeperPtr, zkutil::EphemeralNodeHolderPtr>> active_node_holders;
}; };

View File

@ -0,0 +1,30 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
<replica>
<host>node4</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
<allow_zookeeper_write>1</allow_zookeeper_write>
</clickhouse>

View File

@ -0,0 +1,67 @@
import pytest
import time
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/remote_servers.xml"],
with_zookeeper=True,
stay_alive=True,
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node3 = cluster.add_instance(
"node3", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node4 = cluster.add_instance(
"node4", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_ddl_worker_replicas(started_cluster):
replica_list = node1.query(
"SELECT name FROM system.zookeeper WHERE path='/clickhouse/task_queue/replicas'"
).strip()
replica_list = list(replica_list.split("\n"))
expected_replicas = ["node1:9000", "node2:9000", "node3:9000", "node4:9000"]
assert expected_replicas.sort() == replica_list.sort()
for replica in replica_list:
result = node1.query(
f"SELECT name, value, ephemeralOwner FROM system.zookeeper WHERE path='/clickhouse/task_queue/replicas/{replica}'"
).strip()
lines = list(result.split("\n"))
assert len(lines) == 1
parts = list(lines[0].split("\t"))
assert len(parts) == 3
assert parts[0] == "active"
assert len(parts[1]) != 0
assert len(parts[2]) != 0
node4.stop()
time.sleep(1)
result = node1.query(
f"SELECT name, value, ephemeralOwner FROM system.zookeeper WHERE path='/clickhouse/task_queue/replicas/node4:9000'"
).strip()
lines = list(result.split("\n"))
assert len(lines) == 1
assert len(lines[0]) == 0