Merge pull request #20448 from ClickHouse/better_ddl_queue_cleanup

Better distributed DDL queue cleanup
This commit is contained in:
tavplubix 2021-02-16 19:29:50 +03:00 committed by GitHub
commit 68b427a99d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 118 additions and 82 deletions

View File

@ -602,7 +602,7 @@ void ZooKeeper::removeChildren(const std::string & path)
}
void ZooKeeper::removeChildrenRecursive(const std::string & path)
void ZooKeeper::removeChildrenRecursive(const std::string & path, const String & keep_child_node)
{
Strings children = getChildren(path);
while (!children.empty())
@ -611,14 +611,15 @@ void ZooKeeper::removeChildrenRecursive(const std::string & path)
for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
{
removeChildrenRecursive(path + "/" + children.back());
ops.emplace_back(makeRemoveRequest(path + "/" + children.back(), -1));
if (likely(keep_child_node.empty() || keep_child_node != children.back()))
ops.emplace_back(makeRemoveRequest(path + "/" + children.back(), -1));
children.pop_back();
}
multi(ops);
}
}
void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path)
void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, const String & keep_child_node)
{
Strings children;
if (tryGetChildren(path, children) != Coordination::Error::ZOK)
@ -629,14 +630,14 @@ void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path)
Strings batch;
for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
{
batch.push_back(path + "/" + children.back());
String child_path = path + "/" + children.back();
tryRemoveChildrenRecursive(child_path);
if (likely(keep_child_node.empty() || keep_child_node != children.back()))
{
batch.push_back(child_path);
ops.emplace_back(zkutil::makeRemoveRequest(child_path, -1));
}
children.pop_back();
tryRemoveChildrenRecursive(batch.back());
Coordination::RemoveRequest request;
request.path = batch.back();
ops.emplace_back(std::make_shared<Coordination::RemoveRequest>(std::move(request)));
}
/// Try to remove the children with a faster method - in bulk. If this fails,

View File

@ -184,6 +184,12 @@ public:
/// result would be the same as for the single call.
void tryRemoveRecursive(const std::string & path);
/// Similar to removeRecursive(...) and tryRemoveRecursive(...), but does not remove path itself.
/// If keep_child_node is not empty, this method will not remove path/keep_child_node (but will remove its subtree).
/// It can be useful to keep some child node as a flag which indicates that path is currently removing.
void removeChildrenRecursive(const std::string & path, const String & keep_child_node = {});
void tryRemoveChildrenRecursive(const std::string & path, const String & keep_child_node = {});
/// Remove all children nodes (non recursive).
void removeChildren(const std::string & path);
@ -246,9 +252,6 @@ private:
void init(const std::string & implementation_, const std::string & hosts_, const std::string & identity_,
int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_);
void removeChildrenRecursive(const std::string & path);
void tryRemoveChildrenRecursive(const std::string & path);
/// The following methods don't throw exceptions but return error codes.
Coordination::Error createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created);
Coordination::Error removeImpl(const std::string & path, int32_t version);
@ -320,7 +323,7 @@ public:
catch (...)
{
ProfileEvents::increment(ProfileEvents::CannotRemoveEphemeralNode);
DB::tryLogCurrentException(__PRETTY_FUNCTION__);
DB::tryLogCurrentException(__PRETTY_FUNCTION__, "Cannot remove " + path + ": ");
}
}

View File

@ -652,15 +652,10 @@ void DDLWorker::enqueueTask(DDLTaskPtr task_ptr)
{
recoverZooKeeper();
}
else if (e.code == Coordination::Error::ZNONODE)
{
LOG_ERROR(log, "ZooKeeper error: {}", getCurrentExceptionMessage(true));
// TODO: retry?
}
else
{
LOG_ERROR(log, "Unexpected ZooKeeper error: {}.", getCurrentExceptionMessage(true));
return;
throw;
}
}
catch (...)
@ -695,25 +690,44 @@ void DDLWorker::processTask(DDLTask & task)
LOG_DEBUG(log, "Processing task {} ({})", task.entry_name, task.entry.query);
String dummy;
String active_node_path = task.entry_path + "/active/" + task.host_id_str;
String finished_node_path = task.entry_path + "/finished/" + task.host_id_str;
auto code = zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy);
/// It will tryRemove(...) on exception
auto active_node = zkutil::EphemeralNodeHolder::existing(active_node_path, *zookeeper);
if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS)
/// Try fast path
auto create_active_res = zookeeper->tryCreate(active_node_path, {}, zkutil::CreateMode::Ephemeral);
if (create_active_res != Coordination::Error::ZOK)
{
// Ok
if (create_active_res != Coordination::Error::ZNONODE && create_active_res != Coordination::Error::ZNODEEXISTS)
{
assert(Coordination::isHardwareError(create_active_res));
throw Coordination::Exception(create_active_res, active_node_path);
}
/// Status dirs were not created in enqueueQuery(...) or someone is removing entry
if (create_active_res == Coordination::Error::ZNONODE)
createStatusDirs(task.entry_path, zookeeper);
if (create_active_res == Coordination::Error::ZNODEEXISTS)
{
/// Connection has been lost and now we are retrying to write query status,
/// but our previous ephemeral node still exists.
assert(task.was_executed);
zkutil::EventPtr eph_node_disappeared = std::make_shared<Poco::Event>();
String dummy;
if (zookeeper->tryGet(active_node_path, dummy, nullptr, eph_node_disappeared))
{
constexpr int timeout_ms = 5000;
if (!eph_node_disappeared->tryWait(timeout_ms))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Ephemeral node {} still exists, "
"probably it's owned by someone else", active_node_path);
}
}
zookeeper->create(active_node_path, {}, zkutil::CreateMode::Ephemeral);
}
else if (code == Coordination::Error::ZNONODE)
{
/// There is no parent
createStatusDirs(task.entry_path, zookeeper);
if (Coordination::Error::ZOK != zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy))
throw Coordination::Exception(code, active_node_path);
}
else
throw Coordination::Exception(code, active_node_path);
if (!task.was_executed)
{
@ -969,7 +983,6 @@ void DDLWorker::cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zo
String node_name = *it;
String node_path = fs::path(queue_dir) / node_name;
String lock_path = fs::path(node_path) / "lock";
Coordination::Stat stat;
String dummy;
@ -991,19 +1004,14 @@ void DDLWorker::cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zo
if (!node_lifetime_is_expired && !node_is_outside_max_window)
continue;
/// Skip if there are active nodes (it is weak guard)
if (zookeeper->exists(fs::path(node_path) / "active", &stat) && stat.numChildren > 0)
/// At first we remove entry/active node to prevent staled hosts from executing entry concurrently
auto rm_active_res = zookeeper->tryRemove(fs::path(node_path) / "active");
if (rm_active_res != Coordination::Error::ZOK && rm_active_res != Coordination::Error::ZNONODE)
{
LOG_INFO(log, "Task {} should be deleted, but there are active workers. Skipping it.", node_name);
continue;
}
/// Usage of the lock is not necessary now (tryRemoveRecursive correctly removes node in a presence of concurrent cleaners)
/// But the lock will be required to implement system.distributed_ddl_queue table
auto lock = createSimpleZooKeeperLock(zookeeper, node_path, "lock", host_fqdn_id);
if (!lock->tryLock())
{
LOG_INFO(log, "Task {} should be deleted, but it is locked. Skipping it.", node_name);
if (rm_active_res == Coordination::Error::ZNOTEMPTY)
LOG_DEBUG(log, "Task {} should be deleted, but there are active workers. Skipping it.", node_name);
else
LOG_WARNING(log, "Unexpected status code {} on attempt to remove {}/active", rm_active_res, node_name);
continue;
}
@ -1012,21 +1020,33 @@ void DDLWorker::cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zo
else if (node_is_outside_max_window)
LOG_INFO(log, "Task {} is outdated, deleting it", node_name);
/// Deleting
{
Strings children = zookeeper->getChildren(node_path);
for (const String & child : children)
{
if (child != "lock")
zookeeper->tryRemoveRecursive(fs::path(node_path) / child);
}
/// We recursively delete all nodes except node_path/finished to prevent staled hosts from
/// creating node_path/active node (see createStatusDirs(...))
zookeeper->tryRemoveChildrenRecursive(node_path, "finished");
/// Remove the lock node and its parent atomically
Coordination::Requests ops;
ops.emplace_back(zkutil::makeRemoveRequest(lock_path, -1));
ops.emplace_back(zkutil::makeRemoveRequest(node_path, -1));
zookeeper->multi(ops);
/// And then we remove node_path and node_path/finished in a single transaction
Coordination::Requests ops;
Coordination::Responses res;
ops.emplace_back(zkutil::makeCheckRequest(node_path, -1)); /// See a comment below
ops.emplace_back(zkutil::makeRemoveRequest(fs::path(node_path) / "finished", -1));
ops.emplace_back(zkutil::makeRemoveRequest(node_path, -1));
auto rm_entry_res = zookeeper->tryMulti(ops, res);
if (rm_entry_res == Coordination::Error::ZNONODE)
{
/// Most likely both node_path/finished and node_path were removed concurrently.
bool entry_removed_concurrently = res[0]->error == Coordination::Error::ZNONODE;
if (entry_removed_concurrently)
continue;
/// Possible rare case: initiator node has lost connection after enqueueing entry and failed to create status dirs.
/// No one has started to process the entry, so node_path/active and node_path/finished nodes were never created, node_path has no children.
/// Entry became outdated, but we cannot remove remove it in a transaction with node_path/finished.
assert(res[0]->error == Coordination::Error::ZOK && res[1]->error == Coordination::Error::ZNONODE);
rm_entry_res = zookeeper->tryRemove(node_path);
assert(rm_entry_res != Coordination::Error::ZNOTEMPTY);
continue;
}
zkutil::KeeperMultiException::check(rm_entry_res, ops, res);
}
catch (...)
{
@ -1040,21 +1060,32 @@ void DDLWorker::cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zo
void DDLWorker::createStatusDirs(const std::string & node_path, const ZooKeeperPtr & zookeeper)
{
Coordination::Requests ops;
{
Coordination::CreateRequest request;
request.path = fs::path(node_path) / "active";
ops.emplace_back(std::make_shared<Coordination::CreateRequest>(std::move(request)));
}
{
Coordination::CreateRequest request;
request.path = fs::path(node_path) / "finished";
ops.emplace_back(std::make_shared<Coordination::CreateRequest>(std::move(request)));
}
ops.emplace_back(zkutil::makeCreateRequest(fs::path(node_path) / "active", {}, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(fs::path(node_path) / "finished", {}, zkutil::CreateMode::Persistent));
Coordination::Responses responses;
Coordination::Error code = zookeeper->tryMulti(ops, responses);
if (code != Coordination::Error::ZOK
&& code != Coordination::Error::ZNODEEXISTS)
throw Coordination::Exception(code);
bool both_created = code == Coordination::Error::ZOK;
/// Failed on attempt to create node_path/active because it exists, so node_path/finished must exist too
bool both_already_exists = responses.size() == 2 && responses[0]->error == Coordination::Error::ZNODEEXISTS
&& responses[1]->error == Coordination::Error::ZRUNTIMEINCONSISTENCY;
assert(!both_already_exists || (zookeeper->exists(fs::path(node_path) / "active") && zookeeper->exists(fs::path(node_path) / "finished")));
/// Failed on attempt to create node_path/finished, but node_path/active does not exist
bool is_currently_deleting = responses.size() == 2 && responses[0]->error == Coordination::Error::ZOK
&& responses[1]->error == Coordination::Error::ZNODEEXISTS;
if (both_created || both_already_exists)
return;
if (is_currently_deleting)
throw Exception(ErrorCodes::UNFINISHED, "Cannot create status dirs for {}, "
"most likely because someone is deleting it concurrently", node_path);
/// Connection lost or entry was removed
assert(Coordination::isHardwareError(code) || code == Coordination::Error::ZNONODE);
zkutil::KeeperMultiException::check(code, ops, responses);
}
@ -1114,7 +1145,7 @@ void DDLWorker::runMainThread()
if (!Coordination::isHardwareError(e.code))
{
/// A logical error.
LOG_ERROR(log, "ZooKeeper error: {}. Failed to start DDLWorker.",getCurrentExceptionMessage(true));
LOG_ERROR(log, "ZooKeeper error: {}. Failed to start DDLWorker.", getCurrentExceptionMessage(true));
reset_state(false);
assert(false); /// Catch such failures in tests with debug build
}

View File

@ -751,7 +751,7 @@ void StorageReplicatedMergeTree::drop()
auto zookeeper = global_context.getZooKeeper();
/// If probably there is metadata in ZooKeeper, we don't allow to drop the table.
if (is_readonly || !zookeeper)
if (!zookeeper)
throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
shutdown();

View File

@ -10,8 +10,8 @@ from helpers.test_tools import TSV
class ClickHouseClusterWithDDLHelpers(ClickHouseCluster):
def __init__(self, base_path, config_dir):
ClickHouseCluster.__init__(self, base_path)
def __init__(self, base_path, config_dir, testcase_name):
ClickHouseCluster.__init__(self, base_path, name=testcase_name)
self.test_config_dir = config_dir
@ -104,8 +104,8 @@ class ClickHouseClusterWithDDLHelpers(ClickHouseCluster):
def ddl_check_there_are_no_dublicates(instance):
query = "SELECT max(c), argMax(q, c) FROM (SELECT lower(query) AS q, count() AS c FROM system.query_log WHERE type=2 AND q LIKE '/* ddl_entry=query-%' GROUP BY query)"
rows = instance.query(query)
assert len(rows) > 0 and rows[0][0] == "1", "dublicates on {} {}, query {}".format(instance.name,
instance.ip_address, query)
assert len(rows) > 0 and rows[0][0] == "1", "dublicates on {} {}: {}".format(instance.name,
instance.ip_address, rows)
@staticmethod
def insert_reliable(instance, query_insert):

View File

@ -14,7 +14,7 @@ from .cluster import ClickHouseClusterWithDDLHelpers
@pytest.fixture(scope="module", params=["configs", "configs_secure"])
def test_cluster(request):
cluster = ClickHouseClusterWithDDLHelpers(__file__, request.param)
cluster = ClickHouseClusterWithDDLHelpers(__file__, request.param, request.param)
try:
cluster.prepare()

View File

@ -12,7 +12,7 @@ from .cluster import ClickHouseClusterWithDDLHelpers
@pytest.fixture(scope="module", params=["configs", "configs_secure"])
def test_cluster(request):
cluster = ClickHouseClusterWithDDLHelpers(__file__, request.param)
cluster = ClickHouseClusterWithDDLHelpers(__file__, request.param, "alters_" + request.param)
try:
# TODO: Fix ON CLUSTER alters when nodes have different configs. Need to canonicalize node identity.

View File

@ -22,12 +22,12 @@ DROP TABLE IF EXISTS test_r1;
DROP TABLE IF EXISTS test_r2;
CREATE TABLE test_r1 (x UInt64, "\\" String DEFAULT '\r\n\t\\' || '
') ENGINE = ReplicatedMergeTree('/clickhouse/test', 'r1') ORDER BY "\\";
') ENGINE = ReplicatedMergeTree('/clickhouse/test_01669', 'r1') ORDER BY "\\";
INSERT INTO test_r1 ("\\") VALUES ('\\');
CREATE TABLE test_r2 (x UInt64, "\\" String DEFAULT '\r\n\t\\' || '
') ENGINE = ReplicatedMergeTree('/clickhouse/test', 'r2') ORDER BY "\\";
') ENGINE = ReplicatedMergeTree('/clickhouse/test_01669', 'r2') ORDER BY "\\";
SYSTEM SYNC REPLICA test_r2;

View File

@ -574,6 +574,7 @@
"01676_dictget_in_default_expression",
"01715_background_checker_blather_zookeeper",
"01700_system_zookeeper_path_in",
"01669_columns_declaration_serde",
"attach",
"ddl_dictionaries",
"dictionary",