From 69f579b8ecf9b618a67b41d778a14fe31a48445f Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 13 Feb 2023 14:46:46 +0100 Subject: [PATCH] remove an unused argument --- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 35 +++++-------------- .../MergeTree/ReplicatedMergeTreeQueue.h | 3 +- src/Storages/StorageReplicatedMergeTree.cpp | 6 ++-- 3 files changed, 12 insertions(+), 32 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 3ddafa900a1..f5760899f37 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1107,8 +1107,7 @@ bool ReplicatedMergeTreeQueue::checkReplaceRangeCanBeRemoved(const MergeTreePart void ReplicatedMergeTreeQueue::removePartProducingOpsInRange( zkutil::ZooKeeperPtr zookeeper, const MergeTreePartInfo & part_info, - const std::optional & covering_entry, - const String & fetch_entry_znode) + const std::optional & covering_entry) { /// TODO is it possible to simplify it? Queue to_wait; @@ -1122,40 +1121,22 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange( [[maybe_unused]] bool called_from_alter_query_directly = covering_entry && covering_entry->replace_range_entry && covering_entry->replace_range_entry->columns_version < 0; [[maybe_unused]] bool called_for_broken_part = !covering_entry; - assert(currently_executing_drop_replace_ranges.contains(part_info) || called_from_alter_query_directly || called_for_broken_part || !fetch_entry_znode.empty()); - - auto is_simple_part_producing_op = [](const ReplicatedMergeTreeLogEntryData & data) - { - return data.type == LogEntry::GET_PART || - data.type == LogEntry::ATTACH_PART || - data.type == LogEntry::MERGE_PARTS || - data.type == LogEntry::MUTATE_PART; - }; + assert(currently_executing_drop_replace_ranges.contains(part_info) || called_from_alter_query_directly || called_for_broken_part); for (Queue::iterator it = queue.begin(); it != queue.end();) { - /// Skipping currently processing entry - if (!fetch_entry_znode.empty() && (*it)->znode_name == fetch_entry_znode) - { - ++it; - continue; - } - - bool is_simple_producing_op = is_simple_part_producing_op(**it); - + auto type = (*it)->type; + bool is_simple_producing_op = type == LogEntry::GET_PART || + type == LogEntry::ATTACH_PART || + type == LogEntry::MERGE_PARTS || + type == LogEntry::MUTATE_PART; bool simple_op_covered = is_simple_producing_op && part_info.contains(MergeTreePartInfo::fromPartName((*it)->new_part_name, format_version)); bool replace_range_covered = covering_entry && checkReplaceRangeCanBeRemoved(part_info, *it, *covering_entry); if (simple_op_covered || replace_range_covered) { if ((*it)->currently_executing) - { - bool is_covered_by_simple_op = covering_entry && is_simple_part_producing_op(*covering_entry); - bool is_fetching_covering_part = !fetch_entry_znode.empty(); - if (is_covered_by_simple_op || is_fetching_covering_part) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot remove covered entry {} producing parts {}, it's a bug", - (*it)->znode_name, fmt::join((*it)->getVirtualPartNames(format_version), ", ")); to_wait.push_back(*it); - } + auto code = zookeeper->tryRemove(fs::path(replica_path) / "queue" / (*it)->znode_name); if (code != Coordination::Error::ZOK) LOG_INFO(log, "Couldn't remove {}: {}", (fs::path(replica_path) / "queue" / (*it)->znode_name).string(), Coordination::errorMessage(code)); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 99a82cf3b64..36552129690 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -340,8 +340,7 @@ public: */ void removePartProducingOpsInRange(zkutil::ZooKeeperPtr zookeeper, const MergeTreePartInfo & part_info, - const std::optional & covering_entry, - const String & fetch_entry_znode); + const std::optional & covering_entry); /** In the case where there are not enough parts to perform the merge in part_name * - move actions with merged parts to the end of the queue diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index d3590657a5c..b8091ecebfb 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1888,7 +1888,7 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry) getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range_info.partition_id, drop_range_info.max_block); { auto pause_checking_parts = part_check_thread.pausePartsCheck(); - queue.removePartProducingOpsInRange(getZooKeeper(), drop_range_info, entry, /* fetch_entry_znode= */ {}); + queue.removePartProducingOpsInRange(getZooKeeper(), drop_range_info, entry); part_check_thread.cancelRemovedPartsCheck(drop_range_info); } @@ -1967,7 +1967,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) { getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range.partition_id, drop_range.max_block); auto pause_checking_parts = part_check_thread.pausePartsCheck(); - queue.removePartProducingOpsInRange(getZooKeeper(), drop_range, entry, /* fetch_entry_znode= */ {}); + queue.removePartProducingOpsInRange(getZooKeeper(), drop_range, entry); part_check_thread.cancelRemovedPartsCheck(drop_range); } else @@ -3512,7 +3512,7 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n /// so GET_PART all_1_42_5 (and all source parts) is useless. The only thing we can do is to fetch all_1_42_5_63. /// 2. If all_1_42_5_63 is lost, then replication may stuck waiting for all_1_42_5_63 to appear, /// because we may have some covered parts (more precisely, parts with the same min and max blocks) - queue.removePartProducingOpsInRange(zookeeper, broken_part_info, /* covering_entry= */ {}, /* fetch_entry_znode= */ {}); + queue.removePartProducingOpsInRange(zookeeper, broken_part_info, /* covering_entry= */ {}); String part_path = fs::path(replica_path) / "parts" / part_name;