diff --git a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index 7b29472e389..5104be11e2b 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -179,10 +179,15 @@ ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreeP found_part_with_the_same_min_block = true; parts_found.push_back(part_on_replica); } + if (part_on_replica_info.max_block == part_info.max_block) { found_part_with_the_same_max_block = true; - parts_found.push_back(part_on_replica); + + /// If we are looking for part like partition_X_X_level we can add part + /// partition_X_X_(level-1) two times, avoiding it + if (parts_found.empty() || parts_found.back() != part_on_replica) + parts_found.push_back(part_on_replica); } if (found_part_with_the_same_min_block && found_part_with_the_same_max_block) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 7056e6a6952..7035df09b3d 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -3498,21 +3498,55 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n removePartFromZooKeeper(part_name, ops, stat.numChildren > 0); } - LogEntryPtr log_entry = std::make_shared(); - log_entry->type = LogEntry::GET_PART; - log_entry->create_time = part_create_time; - log_entry->source_replica = ""; - log_entry->new_part_name = part_name; - ops.emplace_back(zkutil::makeCreateRequest( - fs::path(replica_path) / "queue/queue-", log_entry->toString(), - zkutil::CreateMode::PersistentSequential)); + while (true) + { + /// We use merge predicate + version check here, because DROP RANGE update log version and we are trying to avoid race with it. We must be sure, that our part + /// was not dropped, otherwise we will have fetch entry, but no virtual part for it (DROP RANGE will remove it). So bad sequence is the following: + /// 1) Create DROP PART in log for broken_part (for example because it's not only broken, but also empty) + /// 1.5) Pull to log and start execution (removePartProducingOpsInRange) + /// 2) Execute this method (create GET PART for broken part) + /// 3) Finish DROP PART execution (remove part from virtual parts) + /// 4) Now we get GET PART for broken part which is absent in virtual parts == bug + /// + /// Because of version check this method will never create FETCH if drop part exists + ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper, PartitionIdsHint{broken_part_info.partition_id}); + if (merge_pred.hasDropRange(broken_part_info)) + { + LOG_INFO(log, "Broken part is covered by DROP RANGE, don't need to fetch it"); + return; + } - auto results = zookeeper->multi(ops); + LogEntryPtr log_entry = std::make_shared(); + log_entry->type = LogEntry::GET_PART; + log_entry->create_time = part_create_time; + log_entry->source_replica = ""; + log_entry->new_part_name = part_name; - String path_created = dynamic_cast(*results.back()).path_created; - log_entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1); - queue.insert(zookeeper, log_entry); + /// Check that our version of log (and queue) is the most fresh. Otherwise don't create new entry fetch entry. + ops.emplace_back(zkutil::makeCheckRequest(fs::path(zookeeper_path) / "log", merge_pred.getVersion())); + + ops.emplace_back(zkutil::makeCreateRequest( + fs::path(replica_path) / "queue/queue-", log_entry->toString(), + zkutil::CreateMode::PersistentSequential)); + + + Coordination::Responses results; + auto rc = zookeeper->tryMulti(ops, results); + + if (rc == Coordination::Error::ZBADVERSION) + { + LOG_TRACE(log, "Log updated, cannot create fetch entry for part {}, will retry.", part_name); + continue; + } + else + zkutil::KeeperMultiException::check(rc, ops, results); + + String path_created = dynamic_cast(*results.back()).path_created; + log_entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1); + queue.insert(zookeeper, log_entry); + break; + } } @@ -7448,6 +7482,7 @@ bool StorageReplicatedMergeTree::dropPartImpl( String log_znode_path = dynamic_cast(*responses[clear_block_ops_size + 1]).path_created; entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + LOG_TRACE(log, "DROP RANGE for part {} inserted with znode name {}", part_name, entry.znode_name); return true; } }