From 4e9ec5dc2f7c26ee0a22655acfb195e94caa3c33 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 5 Apr 2022 00:51:48 +0200 Subject: [PATCH 1/3] make some replicated DDL faster --- src/Common/ZooKeeper/ZooKeeper.cpp | 51 +++++- src/Common/ZooKeeper/ZooKeeper.h | 5 +- src/Databases/DatabaseReplicated.cpp | 1 + src/Databases/DatabaseReplicatedWorker.cpp | 27 +++- src/Databases/DatabaseReplicatedWorker.h | 2 + src/Interpreters/DDLWorker.cpp | 2 +- src/Storages/StorageReplicatedMergeTree.cpp | 166 +++++++++++++++----- 7 files changed, 200 insertions(+), 54 deletions(-) diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 118789c0ffc..aae3b6d4191 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -701,24 +701,34 @@ void ZooKeeper::removeChildrenRecursive(const std::string & path, const String & } } -void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, const String & keep_child_node) +bool ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, bool probably_flat, const String & keep_child_node) { Strings children; if (tryGetChildren(path, children) != Coordination::Error::ZOK) - return; + return false; + + bool removed_as_expected = true; while (!children.empty()) { Coordination::Requests ops; Strings batch; + ops.reserve(MULTI_BATCH_SIZE); + batch.reserve(MULTI_BATCH_SIZE); for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i) { String child_path = fs::path(path) / children.back(); - tryRemoveChildrenRecursive(child_path); + + /// Will try to avoid recursive getChildren calls if child_path probably has no children. + /// It may be extremely slow when path contain a lot of leaf children. + if (!probably_flat) + tryRemoveChildrenRecursive(child_path); + if (likely(keep_child_node.empty() || keep_child_node != children.back())) { batch.push_back(child_path); ops.emplace_back(zkutil::makeRemoveRequest(child_path, -1)); } + children.pop_back(); } @@ -726,10 +736,39 @@ void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path, const Strin /// this means someone is concurrently removing these children and we will have /// to remove them one by one. Coordination::Responses responses; - if (tryMulti(ops, responses) != Coordination::Error::ZOK) - for (const std::string & child : batch) - tryRemove(child); + if (tryMulti(ops, responses) == Coordination::Error::ZOK) + continue; + + removed_as_expected = false; + + std::vector futures; + futures.reserve(batch.size()); + for (const std::string & child : batch) + futures.push_back(asyncTryRemoveNoThrow(child, -1)); + + for (size_t i = 0; i < batch.size(); ++i) + { + auto res = futures[i].get(); + if (res.error == Coordination::Error::ZOK) + continue; + if (res.error == Coordination::Error::ZNONODE) + continue; + + if (res.error == Coordination::Error::ZNOTEMPTY) + { + if (probably_flat) + { + /// It actually has children, let's remove them + tryRemoveChildrenRecursive(batch[i]); + tryRemove(batch[i]); + } + continue; + } + + throw KeeperException(res.error, batch[i]); + } } + return removed_as_expected; } void ZooKeeper::removeRecursive(const std::string & path) diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index f901a79591f..0f7eccd2547 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -225,7 +225,10 @@ public: /// If keep_child_node is not empty, this method will not remove path/keep_child_node (but will remove its subtree). /// It can be useful to keep some child node as a flag which indicates that path is currently removing. void removeChildrenRecursive(const std::string & path, const String & keep_child_node = {}); - void tryRemoveChildrenRecursive(const std::string & path, const String & keep_child_node = {}); + /// If probably_flat is true, this method will optimistically try to remove children non-recursive + /// and will fall back to recursive removal if it gets ZNOTEMPTY for some child. + /// Returns true if no kind of fallback happened. + bool tryRemoveChildrenRecursive(const std::string & path, bool probably_flat = false, const String & keep_child_node = {}); /// Remove all children nodes (non recursive). void removeChildren(const std::string & path); diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 0c3cc56c061..2337a063a5e 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -675,6 +675,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep } } current_zookeeper->set(replica_path + "/log_ptr", toString(max_log_ptr)); + ddl_worker->updateLogPointer(DDLTaskBase::getLogEntryName(max_log_ptr)); } std::map DatabaseReplicated::tryGetConsistentMetadataSnapshot(const ZooKeeperPtr & zookeeper, UInt32 & max_log_ptr) diff --git a/src/Databases/DatabaseReplicatedWorker.cpp b/src/Databases/DatabaseReplicatedWorker.cpp index b45cfb16362..e9475b56377 100644 --- a/src/Databases/DatabaseReplicatedWorker.cpp +++ b/src/Databases/DatabaseReplicatedWorker.cpp @@ -66,9 +66,14 @@ void DatabaseReplicatedDDLWorker::initializeReplication() UInt32 max_log_ptr = parse(zookeeper->get(database->zookeeper_path + "/max_log_ptr")); logs_to_keep = parse(zookeeper->get(database->zookeeper_path + "/logs_to_keep")); if (our_log_ptr == 0 || our_log_ptr + logs_to_keep < max_log_ptr) + { database->recoverLostReplica(zookeeper, our_log_ptr, max_log_ptr); + } else + { last_skipped_entry_name.emplace(DDLTaskBase::getLogEntryName(our_log_ptr)); + updateLogPointer(DDLTaskBase::getLogEntryName(our_log_ptr)); + } } String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry) @@ -140,10 +145,10 @@ String DatabaseReplicatedDDLWorker::tryEnqueueAndExecuteEntry(DDLLogEntry & entr /// but it requires more complex logic around /try node. auto zookeeper = getAndSetZooKeeper(); - UInt32 our_log_ptr = parse(zookeeper->get(database->replica_path + "/log_ptr")); + UInt32 our_log_ptr = getLogPointer(); UInt32 max_log_ptr = parse(zookeeper->get(database->zookeeper_path + "/max_log_ptr")); - assert(our_log_ptr <= max_log_ptr); - if (database->db_settings.max_replication_lag_to_enqueue < max_log_ptr - our_log_ptr) + + if (our_log_ptr + database->db_settings.max_replication_lag_to_enqueue < max_log_ptr) throw Exception(ErrorCodes::NOT_A_LEADER, "Cannot enqueue query on this replica, " "because it has replication lag of {} queries. Try other replica.", max_log_ptr - our_log_ptr); @@ -203,7 +208,7 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na } } - UInt32 our_log_ptr = parse(zookeeper->get(fs::path(database->replica_path) / "log_ptr")); + UInt32 our_log_ptr = getLogPointer(); UInt32 entry_num = DatabaseReplicatedTask::getLogEntryNumber(entry_name); if (entry_num <= our_log_ptr) @@ -308,4 +313,18 @@ bool DatabaseReplicatedDDLWorker::canRemoveQueueEntry(const String & entry_name, return entry_number + logs_to_keep < max_log_ptr; } +void DatabaseReplicatedDDLWorker::updateLogPointer(const String & processed_entry_name) +{ + updateMaxDDLEntryID(processed_entry_name); + assert(max_id.load() == parse(getAndSetZooKeeper()->get(fs::path(database->replica_path) / "log_ptr"))); +} + +UInt32 DatabaseReplicatedDDLWorker::getLogPointer() const +{ + /// NOTE it main not be equal to the log_ptr in zk: + /// - max_id can be equal to log_ptr - 1 due to race condition (when it's updated in zk, but not updated in memory yet) + /// - max_id can be greater than log_ptr, because log_ptr is not updated for failed and dummy entries + return max_id.load(); +} + } diff --git a/src/Databases/DatabaseReplicatedWorker.h b/src/Databases/DatabaseReplicatedWorker.h index 6b957e567ff..e23be472c54 100644 --- a/src/Databases/DatabaseReplicatedWorker.h +++ b/src/Databases/DatabaseReplicatedWorker.h @@ -32,6 +32,8 @@ public: static String enqueueQueryImpl(const ZooKeeperPtr & zookeeper, DDLLogEntry & entry, DatabaseReplicated * const database, bool committed = false); /// NOLINT + void updateLogPointer(const String & processed_entry_name); + UInt32 getLogPointer() const; private: bool initializeMainThread() override; void initializeReplication(); diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 4d2cdf7dd2c..ba5de0c6668 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -876,7 +876,7 @@ void DDLWorker::cleanupQueue(Int64, const ZooKeeperPtr & zookeeper) /// We recursively delete all nodes except node_path/finished to prevent staled hosts from /// creating node_path/active node (see createStatusDirs(...)) - zookeeper->tryRemoveChildrenRecursive(node_path, "finished"); + zookeeper->tryRemoveChildrenRecursive(node_path, /* probably_flat */ false, "finished"); /// And then we remove node_path and node_path/finished in a single transaction Coordination::Requests ops; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index d9f72cf7feb..7f32d85c4f5 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -318,19 +318,22 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( } bool skip_sanity_checks = false; - - if (current_zookeeper && current_zookeeper->exists(replica_path + "/flags/force_restore_data")) + /// It does not make sense for CREATE query + if (attach) { - skip_sanity_checks = true; - current_zookeeper->remove(replica_path + "/flags/force_restore_data"); + if (current_zookeeper && current_zookeeper->exists(replica_path + "/flags/force_restore_data")) + { + skip_sanity_checks = true; + current_zookeeper->remove(replica_path + "/flags/force_restore_data"); - LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag {}/flags/force_restore_data).", replica_path); - } - else if (has_force_restore_data_flag) - { - skip_sanity_checks = true; + LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag {}/flags/force_restore_data).", replica_path); + } + else if (has_force_restore_data_flag) + { + skip_sanity_checks = true; - LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag force_restore_data)."); + LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag force_restore_data)."); + } } loadDataParts(skip_sanity_checks); @@ -568,35 +571,31 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes() { auto zookeeper = getZooKeeper(); - /// Working with quorum. - zookeeper->createIfNotExists(zookeeper_path + "/quorum", String()); - zookeeper->createIfNotExists(zookeeper_path + "/quorum/parallel", String()); - zookeeper->createIfNotExists(zookeeper_path + "/quorum/last_part", String()); - zookeeper->createIfNotExists(zookeeper_path + "/quorum/failed_parts", String()); - - /// Tracking lag of replicas. - zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", String()); - zookeeper->createIfNotExists(replica_path + "/max_processed_insert_time", String()); - - /// Mutations - zookeeper->createIfNotExists(zookeeper_path + "/mutations", String()); - zookeeper->createIfNotExists(replica_path + "/mutation_pointer", String()); + std::vector futures; + futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/quorum/parallel", String(), zkutil::CreateMode::Persistent)); /// Nodes for remote fs zero-copy replication const auto settings = getSettings(); if (settings->allow_remote_fs_zero_copy_replication) { - zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_s3", String()); - zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_s3/shared", String()); - zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_hdfs", String()); - zookeeper->createIfNotExists(zookeeper_path + "/zero_copy_hdfs/shared", String()); + futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/zero_copy_s3", String(), zkutil::CreateMode::Persistent)); + futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/zero_copy_s3/shared", String(), zkutil::CreateMode::Persistent)); + futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/zero_copy_hdfs", String(), zkutil::CreateMode::Persistent)); + futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/zero_copy_hdfs/shared", String(), zkutil::CreateMode::Persistent)); } /// Part movement. - zookeeper->createIfNotExists(zookeeper_path + "/part_moves_shard", String()); - zookeeper->createIfNotExists(zookeeper_path + "/pinned_part_uuids", getPinnedPartUUIDs()->toString()); + futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/part_moves_shard", String(), zkutil::CreateMode::Persistent)); + futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/pinned_part_uuids", getPinnedPartUUIDs()->toString(), zkutil::CreateMode::Persistent)); /// For ALTER PARTITION with multi-leaders - zookeeper->createIfNotExists(zookeeper_path + "/alter_partition_version", String()); + futures.push_back(zookeeper->asyncTryCreateNoThrow(zookeeper_path + "/alter_partition_version", String(), zkutil::CreateMode::Persistent)); + + for (auto & future : futures) + { + auto res = future.get(); + if (res.error != Coordination::Error::ZOK && res.error != Coordination::Error::ZNODEEXISTS) + throw Coordination::Exception(fmt::format("Failed to create new nodes at {}", zookeeper_path), res.error); + } } @@ -671,6 +670,16 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, zkutil::CreateMode::Persistent)); + /// The following 4 nodes were added in version 1.1.xxx, so we create them here, not in createNewZooKeeperNodes() + ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/quorum", "", + zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/quorum/last_part", "", + zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/quorum/failed_parts", "", + zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/mutations", "", + zkutil::CreateMode::Persistent)); + /// And create first replica atomically. See also "createReplica" method that is used to create not the first replicas. ops.emplace_back(zkutil::makeCreateRequest(replica_path, "", @@ -694,6 +703,14 @@ bool StorageReplicatedMergeTree::createTableIfNotExists(const StorageMetadataPtr ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version), zkutil::CreateMode::Persistent)); + /// The following 3 nodes were added in version 1.1.xxx, so we create them here, not in createNewZooKeeperNodes() + ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/min_unprocessed_insert_time", "", + zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/max_processed_insert_time", "", + zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/mutation_pointer", "", + zkutil::CreateMode::Persistent)); + Coordination::Responses responses; auto code = zookeeper->tryMulti(ops, responses); if (code == Coordination::Error::ZNODEEXISTS) @@ -759,6 +776,14 @@ void StorageReplicatedMergeTree::createReplica(const StorageMetadataPtr & metada ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/metadata_version", std::to_string(metadata_version), zkutil::CreateMode::Persistent)); + /// The following 3 nodes were added in version 1.1.xxx, so we create them here, not in createNewZooKeeperNodes() + ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/min_unprocessed_insert_time", "", + zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/max_processed_insert_time", "", + zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/mutation_pointer", "", + zkutil::CreateMode::Persistent)); + /// Check version of /replicas to see if there are any replicas created at the same moment of time. ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/replicas", "last added replica: " + replica_name, replicas_stat.version)); @@ -840,18 +865,42 @@ void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, con /// "The local set of parts of table X doesn't look like the set of parts in ZooKeeper" /// { - Strings children = zookeeper->getChildren(remote_replica_path); + /// Remove metadata first + [[maybe_unused]] auto code = zookeeper->tryRemove(fs::path(remote_replica_path) / "metadata"); + assert(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE); - if (std::find(children.begin(), children.end(), "metadata") != children.end()) - zookeeper->remove(fs::path(remote_replica_path) / "metadata"); - - for (const auto & child : children) + /// Then try to remove paths that are known to be flat (all children are leafs) + Strings flat_nodes = {"flags", "parts", "queue"}; + for (const auto & node : flat_nodes) { - if (child != "metadata") - zookeeper->removeRecursive(fs::path(remote_replica_path) / child); + bool removed_quickly = zookeeper->tryRemoveChildrenRecursive(fs::path(zookeeper_path) / node, /* probably flat */ true); + if (!removed_quickly) + LOG_WARNING(logger, "Cannot quickly remove node {} and its children (replica: {}). Will remove recursively.", + node, remote_replica_path); } - zookeeper->remove(remote_replica_path); + /// Then try to remove nodes that are known to have no children (and should always exist) + Coordination::Requests ops; + for (const auto & node : flat_nodes) + ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/" + node, -1)); + + ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/columns", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/host", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/is_lost", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/log_pointer", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/max_processed_insert_time", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/min_unprocessed_insert_time", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/metadata_version", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(remote_replica_path + "/mutation_pointer", -1)); + Coordination::Responses res; + code = zookeeper->tryMulti(ops, res); + if (code != Coordination::Error::ZOK) + LOG_WARNING(logger, "Cannot quickly remove nodes without children: {} (replica: {}). Will remove recursively.", + Coordination::errorMessage(code), remote_replica_path); + + + /// And finally remove everything else recursively + zookeeper->tryRemoveRecursive(remote_replica_path); } /// It may left some garbage if replica_path subtree are concurrently modified @@ -911,17 +960,50 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper const String & zookeeper_path, const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger) { bool completely_removed = false; + + Strings flat_nodes = {"block_numbers", "blocks", "leader_election", "log", "mutations", "pinned_part_uuids", "log"}; + + /// First try to remove paths that are known to be flat + for (const auto & node : flat_nodes) + { + bool removed_quickly = zookeeper->tryRemoveChildrenRecursive(fs::path(zookeeper_path) / node, /* probably flat */ true); + if (!removed_quickly) + LOG_WARNING(logger, "Cannot quickly remove node {} and its children (table: {}). Will remove recursively.", + node, zookeeper_path); + } + + /// Then try to remove nodes that are known to have no children (and should always exist) + Coordination::Requests ops; + for (const auto & node : flat_nodes) + ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/" + node, -1)); + + ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/alter_partition_version", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/columns", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/metadata", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/table_shared_id", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/max_processed_insert_time", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/min_unprocessed_insert_time", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/metadata_version", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/mutation_pointer", -1)); + ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/table_shared_id", -1)); + Coordination::Responses res; + auto code = zookeeper->tryMulti(ops, res); + if (code != Coordination::Error::ZOK) + LOG_WARNING(logger, "Cannot quickly remove nodes without children: {} (table: {}). Will remove recursively.", + Coordination::errorMessage(code), zookeeper_path); + Strings children; - Coordination::Error code = zookeeper->tryGetChildren(zookeeper_path, children); + code = zookeeper->tryGetChildren(zookeeper_path, children); if (code == Coordination::Error::ZNONODE) throw Exception(ErrorCodes::LOGICAL_ERROR, "There is a race condition between creation and removal of replicated table. It's a bug"); - for (const auto & child : children) + { if (child != "dropped") zookeeper->tryRemoveRecursive(fs::path(zookeeper_path) / child); + } - Coordination::Requests ops; + ops.clear(); Coordination::Responses responses; ops.emplace_back(zkutil::makeRemoveRequest(metadata_drop_lock->getPath(), -1)); ops.emplace_back(zkutil::makeRemoveRequest(fs::path(zookeeper_path) / "dropped", -1)); @@ -4327,7 +4409,7 @@ std::optional StorageReplicatedMergeTree::totalBytes(const Settings & se void StorageReplicatedMergeTree::assertNotReadonly() const { if (is_readonly) - throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode (zookeeper path: {})", zookeeper_path); + throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode (replica path: {})", replica_path); } From 37a06eec1a062e5ac8ed9a179051491ea98e285c Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 5 Apr 2022 17:36:53 +0200 Subject: [PATCH 2/3] fixes --- src/Databases/DatabaseReplicated.cpp | 2 -- src/Databases/DatabaseReplicatedWorker.cpp | 11 +++++---- src/Databases/DatabaseReplicatedWorker.h | 2 +- src/Interpreters/DDLWorker.cpp | 3 +-- src/Storages/StorageReplicatedMergeTree.cpp | 25 ++++++++++----------- src/Storages/StorageReplicatedMergeTree.h | 3 ++- 6 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 2337a063a5e..d94eceb7dec 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -674,8 +674,6 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep LOG_INFO(log, "Marked recovered {} as finished", entry_name); } } - current_zookeeper->set(replica_path + "/log_ptr", toString(max_log_ptr)); - ddl_worker->updateLogPointer(DDLTaskBase::getLogEntryName(max_log_ptr)); } std::map DatabaseReplicated::tryGetConsistentMetadataSnapshot(const ZooKeeperPtr & zookeeper, UInt32 & max_log_ptr) diff --git a/src/Databases/DatabaseReplicatedWorker.cpp b/src/Databases/DatabaseReplicatedWorker.cpp index e9475b56377..84c3f857a81 100644 --- a/src/Databases/DatabaseReplicatedWorker.cpp +++ b/src/Databases/DatabaseReplicatedWorker.cpp @@ -68,11 +68,14 @@ void DatabaseReplicatedDDLWorker::initializeReplication() if (our_log_ptr == 0 || our_log_ptr + logs_to_keep < max_log_ptr) { database->recoverLostReplica(zookeeper, our_log_ptr, max_log_ptr); + zookeeper->set(database->replica_path + "/log_ptr", toString(max_log_ptr)); + initializeLogPointer(DDLTaskBase::getLogEntryName(max_log_ptr)); } else { - last_skipped_entry_name.emplace(DDLTaskBase::getLogEntryName(our_log_ptr)); - updateLogPointer(DDLTaskBase::getLogEntryName(our_log_ptr)); + String log_entry_name = DDLTaskBase::getLogEntryName(our_log_ptr); + last_skipped_entry_name.emplace(log_entry_name); + initializeLogPointer(log_entry_name); } } @@ -313,7 +316,7 @@ bool DatabaseReplicatedDDLWorker::canRemoveQueueEntry(const String & entry_name, return entry_number + logs_to_keep < max_log_ptr; } -void DatabaseReplicatedDDLWorker::updateLogPointer(const String & processed_entry_name) +void DatabaseReplicatedDDLWorker::initializeLogPointer(const String & processed_entry_name) { updateMaxDDLEntryID(processed_entry_name); assert(max_id.load() == parse(getAndSetZooKeeper()->get(fs::path(database->replica_path) / "log_ptr"))); @@ -321,7 +324,7 @@ void DatabaseReplicatedDDLWorker::updateLogPointer(const String & processed_entr UInt32 DatabaseReplicatedDDLWorker::getLogPointer() const { - /// NOTE it main not be equal to the log_ptr in zk: + /// NOTE it may not be equal to the log_ptr in zk: /// - max_id can be equal to log_ptr - 1 due to race condition (when it's updated in zk, but not updated in memory yet) /// - max_id can be greater than log_ptr, because log_ptr is not updated for failed and dummy entries return max_id.load(); diff --git a/src/Databases/DatabaseReplicatedWorker.h b/src/Databases/DatabaseReplicatedWorker.h index e23be472c54..3c53d288841 100644 --- a/src/Databases/DatabaseReplicatedWorker.h +++ b/src/Databases/DatabaseReplicatedWorker.h @@ -32,11 +32,11 @@ public: static String enqueueQueryImpl(const ZooKeeperPtr & zookeeper, DDLLogEntry & entry, DatabaseReplicated * const database, bool committed = false); /// NOLINT - void updateLogPointer(const String & processed_entry_name); UInt32 getLogPointer() const; private: bool initializeMainThread() override; void initializeReplication(); + void initializeLogPointer(const String & processed_entry_name); DDLTaskPtr initAndCheckTask(const String & entry_name, String & out_reason, const ZooKeeperPtr & zookeeper) override; bool canRemoveQueueEntry(const String & entry_name, const Coordination::Stat & stat) override; diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index ba5de0c6668..9af6b61a0c1 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -632,8 +632,6 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper) task.was_executed = true; } - updateMaxDDLEntryID(task.entry_name); - /// Step 3: Create node in finished/ status dir and write execution status. /// FIXME: if server fails right here, the task will be executed twice. We need WAL here. /// NOTE: If ZooKeeper connection is lost here, we will try again to write query status. @@ -650,6 +648,7 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper) active_node->setAlreadyRemoved(); task.completely_processed = true; + updateMaxDDLEntryID(task.entry_name); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ff37b98bbb6..1127337adff 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -828,13 +828,14 @@ void StorageReplicatedMergeTree::drop() throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY); shutdown(); - dropReplica(zookeeper, zookeeper_path, replica_name, log); + dropReplica(zookeeper, zookeeper_path, replica_name, log, getSettings()); } dropAllData(); } -void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica, Poco::Logger * logger) +void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica, + Poco::Logger * logger, MergeTreeSettingsPtr table_settings) { if (zookeeper->expired()) throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED); @@ -871,12 +872,14 @@ void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, con assert(code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE); /// Then try to remove paths that are known to be flat (all children are leafs) - Strings flat_nodes = {"flags", "parts", "queue"}; + Strings flat_nodes = {"flags", "queue"}; + if (table_settings && table_settings->use_minimalistic_part_header_in_zookeeper) + flat_nodes.emplace_back("parts"); for (const auto & node : flat_nodes) { - bool removed_quickly = zookeeper->tryRemoveChildrenRecursive(fs::path(zookeeper_path) / node, /* probably flat */ true); + bool removed_quickly = zookeeper->tryRemoveChildrenRecursive(fs::path(remote_replica_path) / node, /* probably flat */ true); if (!removed_quickly) - LOG_WARNING(logger, "Cannot quickly remove node {} and its children (replica: {}). Will remove recursively.", + LOG_WARNING(logger, "Failed to quickly remove node '{}' and its children, fell back to recursive removal (replica: {})", node, remote_replica_path); } @@ -899,7 +902,6 @@ void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, con LOG_WARNING(logger, "Cannot quickly remove nodes without children: {} (replica: {}). Will remove recursively.", Coordination::errorMessage(code), remote_replica_path); - /// And finally remove everything else recursively zookeeper->tryRemoveRecursive(remote_replica_path); } @@ -962,14 +964,16 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper { bool completely_removed = false; - Strings flat_nodes = {"block_numbers", "blocks", "leader_election", "log", "mutations", "pinned_part_uuids", "log"}; + /// NOTE /block_numbers/ actually is not flat, because /block_numbers// may have ephemeral children, + /// but we assume that all ephemeral block locks are already removed when table is being dropped. + static constexpr std::array flat_nodes = {"block_numbers", "blocks", "leader_election", "log", "mutations", "pinned_part_uuids"}; /// First try to remove paths that are known to be flat for (const auto & node : flat_nodes) { bool removed_quickly = zookeeper->tryRemoveChildrenRecursive(fs::path(zookeeper_path) / node, /* probably flat */ true); if (!removed_quickly) - LOG_WARNING(logger, "Cannot quickly remove node {} and its children (table: {}). Will remove recursively.", + LOG_WARNING(logger, "Failed to quickly remove node '{}' and its children, fell back to recursive removal (table: {})", node, zookeeper_path); } @@ -982,11 +986,6 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/columns", -1)); ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/metadata", -1)); ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/table_shared_id", -1)); - ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/max_processed_insert_time", -1)); - ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/min_unprocessed_insert_time", -1)); - ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/metadata_version", -1)); - ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/mutation_pointer", -1)); - ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/table_shared_id", -1)); Coordination::Responses res; auto code = zookeeper->tryMulti(ops, res); if (code != Coordination::Error::ZOK) diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index c567447e9f2..317544c8bb8 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -214,7 +214,8 @@ public: /** Remove a specific replica from zookeeper. */ - static void dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica, Poco::Logger * logger); + static void dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica, + Poco::Logger * logger, MergeTreeSettingsPtr table_settings = nullptr); /// Removes table from ZooKeeper after the last replica was dropped static bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, From 1309e781b624fdfa5c5886eb25f5f2ee52c5fe89 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Wed, 6 Apr 2022 13:56:26 +0200 Subject: [PATCH 3/3] apply suggestion --- src/Storages/StorageReplicatedMergeTree.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 1127337adff..39840f91325 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -969,7 +969,7 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper static constexpr std::array flat_nodes = {"block_numbers", "blocks", "leader_election", "log", "mutations", "pinned_part_uuids"}; /// First try to remove paths that are known to be flat - for (const auto & node : flat_nodes) + for (const auto * node : flat_nodes) { bool removed_quickly = zookeeper->tryRemoveChildrenRecursive(fs::path(zookeeper_path) / node, /* probably flat */ true); if (!removed_quickly) @@ -979,7 +979,7 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper /// Then try to remove nodes that are known to have no children (and should always exist) Coordination::Requests ops; - for (const auto & node : flat_nodes) + for (const auto * node : flat_nodes) ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/" + node, -1)); ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_path + "/alter_partition_version", -1));