diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 4537d5ad8cd..a1c6eb9b481 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -602,7 +602,7 @@ void ZooKeeper::removeChildren(const std::string & path) } -void ZooKeeper::removeChildrenRecursive(const std::string & path) +void ZooKeeper::removeChildrenRecursive(const std::string & path, const String & keep_child_node) { Strings children = getChildren(path); while (!children.empty()) @@ -611,14 +611,15 @@ void ZooKeeper::removeChildrenRecursive(const std::string & path) for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i) { removeChildrenRecursive(path + "/" + children.back()); - ops.emplace_back(makeRemoveRequest(path + "/" + children.back(), -1)); + if (likely(keep_child_node.empty() || keep_child_node != children.back())) + ops.emplace_back(makeRemoveRequest(path + "/" + children.back(), -1)); children.pop_back(); } multi(ops); } } -void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path) +void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, const String & keep_child_node) { Strings children; if (tryGetChildren(path, children) != Coordination::Error::ZOK) @@ -629,14 +630,14 @@ void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path) Strings batch; for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i) { - batch.push_back(path + "/" + children.back()); + String child_path = path + "/" + children.back(); + tryRemoveChildrenRecursive(child_path); + if (likely(keep_child_node.empty() || keep_child_node != children.back())) + { + batch.push_back(child_path); + ops.emplace_back(zkutil::makeRemoveRequest(child_path, -1)); + } children.pop_back(); - tryRemoveChildrenRecursive(batch.back()); - - Coordination::RemoveRequest request; - request.path = batch.back(); - - ops.emplace_back(std::make_shared(std::move(request))); } /// Try to remove the children with a faster method - in bulk. If this fails, diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index 0d9dc104c48..90d15e2ac4a 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -184,6 +184,12 @@ public: /// result would be the same as for the single call. void tryRemoveRecursive(const std::string & path); + /// Similar to removeRecursive(...) and tryRemoveRecursive(...), but does not remove path itself. + /// If keep_child_node is not empty, this method will not remove path/keep_child_node (but will remove its subtree). + /// It can be useful to keep some child node as a flag which indicates that path is currently removing. + void removeChildrenRecursive(const std::string & path, const String & keep_child_node = {}); + void tryRemoveChildrenRecursive(const std::string & path, const String & keep_child_node = {}); + /// Remove all children nodes (non recursive). void removeChildren(const std::string & path); @@ -246,9 +252,6 @@ private: void init(const std::string & implementation_, const std::string & hosts_, const std::string & identity_, int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_); - void removeChildrenRecursive(const std::string & path); - void tryRemoveChildrenRecursive(const std::string & path); - /// The following methods don't throw exceptions but return error codes. Coordination::Error createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created); Coordination::Error removeImpl(const std::string & path, int32_t version); @@ -320,7 +323,7 @@ public: catch (...) { ProfileEvents::increment(ProfileEvents::CannotRemoveEphemeralNode); - DB::tryLogCurrentException(__PRETTY_FUNCTION__); + DB::tryLogCurrentException(__PRETTY_FUNCTION__, "Cannot remove " + path + ": "); } } diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 05370a6a3b7..fc460a5584c 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -652,15 +652,10 @@ void DDLWorker::enqueueTask(DDLTaskPtr task_ptr) { recoverZooKeeper(); } - else if (e.code == Coordination::Error::ZNONODE) - { - LOG_ERROR(log, "ZooKeeper error: {}", getCurrentExceptionMessage(true)); - // TODO: retry? - } else { LOG_ERROR(log, "Unexpected ZooKeeper error: {}.", getCurrentExceptionMessage(true)); - return; + throw; } } catch (...) @@ -695,25 +690,44 @@ void DDLWorker::processTask(DDLTask & task) LOG_DEBUG(log, "Processing task {} ({})", task.entry_name, task.entry.query); - String dummy; String active_node_path = task.entry_path + "/active/" + task.host_id_str; String finished_node_path = task.entry_path + "/finished/" + task.host_id_str; - auto code = zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy); + /// It will tryRemove(...) on exception + auto active_node = zkutil::EphemeralNodeHolder::existing(active_node_path, *zookeeper); - if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS) + /// Try fast path + auto create_active_res = zookeeper->tryCreate(active_node_path, {}, zkutil::CreateMode::Ephemeral); + if (create_active_res != Coordination::Error::ZOK) { - // Ok + if (create_active_res != Coordination::Error::ZNONODE && create_active_res != Coordination::Error::ZNODEEXISTS) + { + assert(Coordination::isHardwareError(create_active_res)); + throw Coordination::Exception(create_active_res, active_node_path); + } + + /// Status dirs were not created in enqueueQuery(...) or someone is removing entry + if (create_active_res == Coordination::Error::ZNONODE) + createStatusDirs(task.entry_path, zookeeper); + + if (create_active_res == Coordination::Error::ZNODEEXISTS) + { + /// Connection has been lost and now we are retrying to write query status, + /// but our previous ephemeral node still exists. + assert(task.was_executed); + zkutil::EventPtr eph_node_disappeared = std::make_shared(); + String dummy; + if (zookeeper->tryGet(active_node_path, dummy, nullptr, eph_node_disappeared)) + { + constexpr int timeout_ms = 5000; + if (!eph_node_disappeared->tryWait(timeout_ms)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Ephemeral node {} still exists, " + "probably it's owned by someone else", active_node_path); + } + } + + zookeeper->create(active_node_path, {}, zkutil::CreateMode::Ephemeral); } - else if (code == Coordination::Error::ZNONODE) - { - /// There is no parent - createStatusDirs(task.entry_path, zookeeper); - if (Coordination::Error::ZOK != zookeeper->tryCreate(active_node_path, "", zkutil::CreateMode::Ephemeral, dummy)) - throw Coordination::Exception(code, active_node_path); - } - else - throw Coordination::Exception(code, active_node_path); if (!task.was_executed) { @@ -969,7 +983,6 @@ void DDLWorker::cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zo String node_name = *it; String node_path = fs::path(queue_dir) / node_name; - String lock_path = fs::path(node_path) / "lock"; Coordination::Stat stat; String dummy; @@ -991,19 +1004,14 @@ void DDLWorker::cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zo if (!node_lifetime_is_expired && !node_is_outside_max_window) continue; - /// Skip if there are active nodes (it is weak guard) - if (zookeeper->exists(fs::path(node_path) / "active", &stat) && stat.numChildren > 0) + /// At first we remove entry/active node to prevent staled hosts from executing entry concurrently + auto rm_active_res = zookeeper->tryRemove(fs::path(node_path) / "active"); + if (rm_active_res != Coordination::Error::ZOK && rm_active_res != Coordination::Error::ZNONODE) { - LOG_INFO(log, "Task {} should be deleted, but there are active workers. Skipping it.", node_name); - continue; - } - - /// Usage of the lock is not necessary now (tryRemoveRecursive correctly removes node in a presence of concurrent cleaners) - /// But the lock will be required to implement system.distributed_ddl_queue table - auto lock = createSimpleZooKeeperLock(zookeeper, node_path, "lock", host_fqdn_id); - if (!lock->tryLock()) - { - LOG_INFO(log, "Task {} should be deleted, but it is locked. Skipping it.", node_name); + if (rm_active_res == Coordination::Error::ZNOTEMPTY) + LOG_DEBUG(log, "Task {} should be deleted, but there are active workers. Skipping it.", node_name); + else + LOG_WARNING(log, "Unexpected status code {} on attempt to remove {}/active", rm_active_res, node_name); continue; } @@ -1012,21 +1020,33 @@ void DDLWorker::cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zo else if (node_is_outside_max_window) LOG_INFO(log, "Task {} is outdated, deleting it", node_name); - /// Deleting - { - Strings children = zookeeper->getChildren(node_path); - for (const String & child : children) - { - if (child != "lock") - zookeeper->tryRemoveRecursive(fs::path(node_path) / child); - } + /// We recursively delete all nodes except node_path/finished to prevent staled hosts from + /// creating node_path/active node (see createStatusDirs(...)) + zookeeper->tryRemoveChildrenRecursive(node_path, "finished"); - /// Remove the lock node and its parent atomically - Coordination::Requests ops; - ops.emplace_back(zkutil::makeRemoveRequest(lock_path, -1)); - ops.emplace_back(zkutil::makeRemoveRequest(node_path, -1)); - zookeeper->multi(ops); + /// And then we remove node_path and node_path/finished in a single transaction + Coordination::Requests ops; + Coordination::Responses res; + ops.emplace_back(zkutil::makeCheckRequest(node_path, -1)); /// See a comment below + ops.emplace_back(zkutil::makeRemoveRequest(fs::path(node_path) / "finished", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(node_path, -1)); + auto rm_entry_res = zookeeper->tryMulti(ops, res); + if (rm_entry_res == Coordination::Error::ZNONODE) + { + /// Most likely both node_path/finished and node_path were removed concurrently. + bool entry_removed_concurrently = res[0]->error == Coordination::Error::ZNONODE; + if (entry_removed_concurrently) + continue; + + /// Possible rare case: initiator node has lost connection after enqueueing entry and failed to create status dirs. + /// No one has started to process the entry, so node_path/active and node_path/finished nodes were never created, node_path has no children. + /// Entry became outdated, but we cannot remove remove it in a transaction with node_path/finished. + assert(res[0]->error == Coordination::Error::ZOK && res[1]->error == Coordination::Error::ZNONODE); + rm_entry_res = zookeeper->tryRemove(node_path); + assert(rm_entry_res != Coordination::Error::ZNOTEMPTY); + continue; } + zkutil::KeeperMultiException::check(rm_entry_res, ops, res); } catch (...) { @@ -1040,21 +1060,32 @@ void DDLWorker::cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zo void DDLWorker::createStatusDirs(const std::string & node_path, const ZooKeeperPtr & zookeeper) { Coordination::Requests ops; - { - Coordination::CreateRequest request; - request.path = fs::path(node_path) / "active"; - ops.emplace_back(std::make_shared(std::move(request))); - } - { - Coordination::CreateRequest request; - request.path = fs::path(node_path) / "finished"; - ops.emplace_back(std::make_shared(std::move(request))); - } + ops.emplace_back(zkutil::makeCreateRequest(fs::path(node_path) / "active", {}, zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(fs::path(node_path) / "finished", {}, zkutil::CreateMode::Persistent)); + Coordination::Responses responses; Coordination::Error code = zookeeper->tryMulti(ops, responses); - if (code != Coordination::Error::ZOK - && code != Coordination::Error::ZNODEEXISTS) - throw Coordination::Exception(code); + + bool both_created = code == Coordination::Error::ZOK; + + /// Failed on attempt to create node_path/active because it exists, so node_path/finished must exist too + bool both_already_exists = responses.size() == 2 && responses[0]->error == Coordination::Error::ZNODEEXISTS + && responses[1]->error == Coordination::Error::ZRUNTIMEINCONSISTENCY; + assert(!both_already_exists || (zookeeper->exists(fs::path(node_path) / "active") && zookeeper->exists(fs::path(node_path) / "finished"))); + + /// Failed on attempt to create node_path/finished, but node_path/active does not exist + bool is_currently_deleting = responses.size() == 2 && responses[0]->error == Coordination::Error::ZOK + && responses[1]->error == Coordination::Error::ZNODEEXISTS; + if (both_created || both_already_exists) + return; + + if (is_currently_deleting) + throw Exception(ErrorCodes::UNFINISHED, "Cannot create status dirs for {}, " + "most likely because someone is deleting it concurrently", node_path); + + /// Connection lost or entry was removed + assert(Coordination::isHardwareError(code) || code == Coordination::Error::ZNONODE); + zkutil::KeeperMultiException::check(code, ops, responses); } @@ -1114,7 +1145,7 @@ void DDLWorker::runMainThread() if (!Coordination::isHardwareError(e.code)) { /// A logical error. - LOG_ERROR(log, "ZooKeeper error: {}. Failed to start DDLWorker.",getCurrentExceptionMessage(true)); + LOG_ERROR(log, "ZooKeeper error: {}. Failed to start DDLWorker.", getCurrentExceptionMessage(true)); reset_state(false); assert(false); /// Catch such failures in tests with debug build } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 097b7679899..518577c473c 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -751,7 +751,7 @@ void StorageReplicatedMergeTree::drop() auto zookeeper = global_context.getZooKeeper(); /// If probably there is metadata in ZooKeeper, we don't allow to drop the table. - if (is_readonly || !zookeeper) + if (!zookeeper) throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY); shutdown(); diff --git a/tests/integration/test_distributed_ddl/cluster.py b/tests/integration/test_distributed_ddl/cluster.py index 811eb94bad4..24f11fec547 100644 --- a/tests/integration/test_distributed_ddl/cluster.py +++ b/tests/integration/test_distributed_ddl/cluster.py @@ -10,8 +10,8 @@ from helpers.test_tools import TSV class ClickHouseClusterWithDDLHelpers(ClickHouseCluster): - def __init__(self, base_path, config_dir): - ClickHouseCluster.__init__(self, base_path) + def __init__(self, base_path, config_dir, testcase_name): + ClickHouseCluster.__init__(self, base_path, name=testcase_name) self.test_config_dir = config_dir @@ -104,8 +104,8 @@ class ClickHouseClusterWithDDLHelpers(ClickHouseCluster): def ddl_check_there_are_no_dublicates(instance): query = "SELECT max(c), argMax(q, c) FROM (SELECT lower(query) AS q, count() AS c FROM system.query_log WHERE type=2 AND q LIKE '/* ddl_entry=query-%' GROUP BY query)" rows = instance.query(query) - assert len(rows) > 0 and rows[0][0] == "1", "dublicates on {} {}, query {}".format(instance.name, - instance.ip_address, query) + assert len(rows) > 0 and rows[0][0] == "1", "dublicates on {} {}: {}".format(instance.name, + instance.ip_address, rows) @staticmethod def insert_reliable(instance, query_insert): diff --git a/tests/integration/test_distributed_ddl/test.py b/tests/integration/test_distributed_ddl/test.py index f0e78dfec41..58e1d0d06f7 100755 --- a/tests/integration/test_distributed_ddl/test.py +++ b/tests/integration/test_distributed_ddl/test.py @@ -14,7 +14,7 @@ from .cluster import ClickHouseClusterWithDDLHelpers @pytest.fixture(scope="module", params=["configs", "configs_secure"]) def test_cluster(request): - cluster = ClickHouseClusterWithDDLHelpers(__file__, request.param) + cluster = ClickHouseClusterWithDDLHelpers(__file__, request.param, request.param) try: cluster.prepare() diff --git a/tests/integration/test_distributed_ddl/test_replicated_alter.py b/tests/integration/test_distributed_ddl/test_replicated_alter.py index bd95f5660b7..148ad5fca5e 100644 --- a/tests/integration/test_distributed_ddl/test_replicated_alter.py +++ b/tests/integration/test_distributed_ddl/test_replicated_alter.py @@ -12,7 +12,7 @@ from .cluster import ClickHouseClusterWithDDLHelpers @pytest.fixture(scope="module", params=["configs", "configs_secure"]) def test_cluster(request): - cluster = ClickHouseClusterWithDDLHelpers(__file__, request.param) + cluster = ClickHouseClusterWithDDLHelpers(__file__, request.param, "alters_" + request.param) try: # TODO: Fix ON CLUSTER alters when nodes have different configs. Need to canonicalize node identity. diff --git a/tests/queries/0_stateless/01669_columns_declaration_serde.sql b/tests/queries/0_stateless/01669_columns_declaration_serde.sql index 8e3354d63cd..a6bf1184e9f 100644 --- a/tests/queries/0_stateless/01669_columns_declaration_serde.sql +++ b/tests/queries/0_stateless/01669_columns_declaration_serde.sql @@ -22,12 +22,12 @@ DROP TABLE IF EXISTS test_r1; DROP TABLE IF EXISTS test_r2; CREATE TABLE test_r1 (x UInt64, "\\" String DEFAULT '\r\n\t\\' || ' -') ENGINE = ReplicatedMergeTree('/clickhouse/test', 'r1') ORDER BY "\\"; +') ENGINE = ReplicatedMergeTree('/clickhouse/test_01669', 'r1') ORDER BY "\\"; INSERT INTO test_r1 ("\\") VALUES ('\\'); CREATE TABLE test_r2 (x UInt64, "\\" String DEFAULT '\r\n\t\\' || ' -') ENGINE = ReplicatedMergeTree('/clickhouse/test', 'r2') ORDER BY "\\"; +') ENGINE = ReplicatedMergeTree('/clickhouse/test_01669', 'r2') ORDER BY "\\"; SYSTEM SYNC REPLICA test_r2; diff --git a/tests/queries/skip_list.json b/tests/queries/skip_list.json index e4e7504ba41..32e01a16277 100644 --- a/tests/queries/skip_list.json +++ b/tests/queries/skip_list.json @@ -574,6 +574,7 @@ "01676_dictget_in_default_expression", "01715_background_checker_blather_zookeeper", "01700_system_zookeeper_path_in", + "01669_columns_declaration_serde", "attach", "ddl_dictionaries", "dictionary",