From 2571ad7d438297d9aa799783c2301f520718572c Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 31 May 2021 00:30:50 +0300 Subject: [PATCH] try fix intersecting virtual parts --- .../MergeTree/ReplicatedMergeTreeLogEntry.cpp | 40 ++- .../MergeTree/ReplicatedMergeTreeLogEntry.h | 14 + .../MergeTree/ReplicatedMergeTreeQueue.cpp | 271 +++++++++++++++--- .../MergeTree/ReplicatedMergeTreeQueue.h | 16 +- 4 files changed, 297 insertions(+), 44 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp index dbd55bc4ff3..ee802a2a5b1 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp @@ -152,6 +152,10 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const out << "sync_pinned_part_uuids\n"; break; + case PART_IS_LOST: + out << "lost\n" << new_part_name; + break; + default: throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown log entry type: {}", static_cast(type)); } @@ -326,6 +330,11 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) in >> new_part_name; in >> "\nsource_shard: " >> source_shard; } + else if (type_str == "lost") + { + type = PART_IS_LOST; + in >> new_part_name; + } if (!trailing_newline_found) in >> "\n"; @@ -389,7 +398,6 @@ void ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry::readText(ReadBuffer & i bool ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry::isMovePartitionOrAttachFrom(const MergeTreePartInfo & drop_range_info) { - assert(drop_range_info.getBlocksCount() != 0); return drop_range_info.getBlocksCount() == 1; } @@ -413,6 +421,24 @@ ReplicatedMergeTreeLogEntry::Ptr ReplicatedMergeTreeLogEntry::parse(const String return res; } +std::optional ReplicatedMergeTreeLogEntryData::getDropRange(MergeTreeDataFormatVersion format_version) const +{ + if (type == DROP_RANGE) + return new_part_name; + + if (type == REPLACE_RANGE) + { + auto drop_range_info = MergeTreePartInfo::fromPartName(replace_range_entry->drop_range_part_name, format_version); + if (!ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range_info)) + { + /// It's REPLACE, not MOVE or ATTACH, so drop range is real + return replace_range_entry->drop_range_part_name; + } + } + + return {}; +} + Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormatVersion format_version) const { /// Doesn't produce any part @@ -430,12 +456,8 @@ Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormat if (type == REPLACE_RANGE) { Strings res = replace_range_entry->new_part_names; - auto drop_range_info = MergeTreePartInfo::fromPartName(replace_range_entry->drop_range_part_name, format_version); - if (!ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range_info)) - { - /// It's REPLACE, not MOVE or ATTACH, so drop range is real - res.emplace_back(replace_range_entry->drop_range_part_name); - } + if (auto drop_range = getDropRange(format_version)) + res.emplace_back(*drop_range); return res; } @@ -447,6 +469,10 @@ Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormat if (type == CLONE_PART_FROM_SHARD) return {}; + /// Doesn't produce any part. + if (type == PART_IS_LOST) + return {}; + return {new_part_name}; } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h b/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h index 12f1c78fb5d..5d62f219924 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h @@ -45,6 +45,7 @@ struct ReplicatedMergeTreeLogEntryData ALTER_METADATA, /// Apply alter modification according to global /metadata and /columns paths SYNC_PINNED_PART_UUIDS, /// Synchronization point for ensuring that all replicas have up to date in-memory state. CLONE_PART_FROM_SHARD, /// Clone part from another shard. + PART_IS_LOST, /// Cancels previous operations with lost data part. Kinda "anti-merge". }; static String typeToString(Type type) @@ -62,6 +63,7 @@ struct ReplicatedMergeTreeLogEntryData case ReplicatedMergeTreeLogEntryData::ALTER_METADATA: return "ALTER_METADATA"; case ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS: return "SYNC_PINNED_PART_UUIDS"; case ReplicatedMergeTreeLogEntryData::CLONE_PART_FROM_SHARD: return "CLONE_PART_FROM_SHARD"; + case ReplicatedMergeTreeLogEntryData::PART_IS_LOST: return "PART_IS_LOST"; default: throw Exception("Unknown log entry type: " + DB::toString(type), ErrorCodes::LOGICAL_ERROR); } @@ -140,6 +142,18 @@ struct ReplicatedMergeTreeLogEntryData /// selection of merges. These parts are added to queue.virtual_parts. Strings getVirtualPartNames(MergeTreeDataFormatVersion format_version) const; + /// Returns name of part that will never appear and should be removed from virtual parts set. + /// It's required to correctly cancel merge which cannot be executed, because some source part is lost forever. + /// Do not use it for other purposes, it can be dangerous. + std::optional getAntiVirtualPartName() const + { + if (type == PART_IS_LOST) + return new_part_name; + return {}; + } + + std::optional getDropRange(MergeTreeDataFormatVersion format_version) const; + /// Returns set of parts that denote the block number ranges that should be blocked during the entry execution. /// These parts are added to future_parts. Strings getBlockingPartNames(MergeTreeDataFormatVersion format_version) const diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 30569e53f64..84050f21528 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -33,14 +33,20 @@ ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & log = &Poco::Logger::get(logger_name); } - -void ReplicatedMergeTreeQueue::initialize(const MergeTreeData::DataParts & parts) +void ReplicatedMergeTreeQueue::clear() { - addVirtualParts(parts); + auto locks = lockQueue(); + assert(future_parts.empty()); + current_parts.clear(); + virtual_parts.clear(); + queue.clear(); + inserts_by_time.clear(); + mutations_by_znode.clear(); + mutations_by_partition.clear(); + mutation_pointer.clear(); } - -void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts & parts) +void ReplicatedMergeTreeQueue::initialize(const MergeTreeData::DataParts & parts) { std::lock_guard lock(state_mutex); @@ -74,9 +80,6 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper) /// Reset batch size on initialization to recover from possible errors of too large batch size. current_multi_batch_size = 1; - String log_pointer_str = zookeeper->get(replica_path + "/log_pointer"); - log_pointer = log_pointer_str.empty() ? 0 : parse(log_pointer_str); - std::unordered_set already_loaded_paths; { std::lock_guard lock(state_mutex); @@ -132,6 +135,45 @@ void ReplicatedMergeTreeQueue::insertUnlocked( const LogEntryPtr & entry, std::optional & min_unprocessed_insert_time_changed, std::lock_guard & state_lock) { + if (auto lost_part = entry->getAntiVirtualPartName()) + { + LOG_TRACE(log, "Reading {}: PART_IS_LOST for {}", entry->znode_name, *lost_part); + QueueIters entries = findEntriesByNewPartName(*lost_part); + bool removed = virtual_parts.remove(*lost_part); + if (!removed && !entries.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Found {} entries for lost part {}, " + "but nothing is removed from virtual parts", entries.size(), *lost_part); + for (const auto & entry_it : entries) + { + for (const auto & source_part : (*entry_it)->source_parts) + { + String containing_current_part = current_parts.getContainingPart(source_part); + if (containing_current_part.empty()) + { + bool has_source_entry = std::any_of(queue.begin(), queue.end(), [&lost_part](auto e) + { + return e->new_part_name == *lost_part; + }); + if (!has_source_entry) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Source entry ({}) of lost virtual part {} does not exist", + source_part, *lost_part); + } + else + { + if (containing_current_part != source_part) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Source part {} of lost part {} is covered by current part {}. Entry: {}", + source_part, *lost_part, containing_current_part, (*entry_it)->toString()); + } + + virtual_parts.add(source_part); + } + } + + lost_forever_parts.insert(*lost_part); + } + for (const String & virtual_part_name : entry->getVirtualPartNames(format_version)) { virtual_parts.add(virtual_part_name); @@ -228,16 +270,10 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval( removeCoveredPartsFromMutations(virtual_part_name, /*remove_part = */ false, /*remove_covered_parts = */ true); } - String drop_range_part_name; - if (entry->type == LogEntry::DROP_RANGE) - drop_range_part_name = entry->new_part_name; - else if (entry->type == LogEntry::REPLACE_RANGE) - drop_range_part_name = entry->replace_range_entry->drop_range_part_name; - - if (!drop_range_part_name.empty()) + if (auto drop_range_part_name = entry->getDropRange(format_version)) { - current_parts.remove(drop_range_part_name); - virtual_parts.remove(drop_range_part_name); + current_parts.remove(*drop_range_part_name); + virtual_parts.remove(*drop_range_part_name); } if (entry->type == LogEntry::ALTER_METADATA) @@ -301,10 +337,7 @@ void ReplicatedMergeTreeQueue::addPartToMutations(const String & part_name) { auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version); - /// Do not add special virtual parts to parts_to_do - auto max_level = MergeTreePartInfo::MAX_LEVEL; /// DROP/DETACH PARTITION - auto another_max_level = std::numeric_limits::max(); /// REPLACE/MOVE PARTITION - if (part_info.level == max_level || part_info.level == another_max_level) + if (part_info.isFakeDropRangePart()) return; auto in_partition = mutations_by_partition.find(part_info.partition_id); @@ -392,7 +425,8 @@ void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeep } if (!found && need_remove_from_zk) - throw Exception("Can't find " + entry->znode_name + " in the memory queue. It is a bug", ErrorCodes::LOGICAL_ERROR); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find {} in the memory queue. It is a bug. Entry info: {}", + entry->znode_name, entry->toString()); notifySubscribers(queue_size); @@ -461,9 +495,162 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri return true; } - -bool ReplicatedMergeTreeQueue::removeFromVirtualParts(const MergeTreePartInfo & part_info) +bool ReplicatedMergeTreeQueue::markPartAsLostForever(zkutil::ZooKeeperPtr zookeeper, const String & part_name) { + LogEntry entry; + entry.type = LogEntry::PART_IS_LOST; + entry.source_replica = storage.replica_name; + entry.new_part_name = part_name; + + size_t max_iters = 100; + while (--max_iters) + { + ReplicatedMergeTreeMergePredicate merge_pred = getMergePredicate(zookeeper); + + if (!merge_pred.ensurePartIsLost(part_name, log)) + return false; + + Coordination::Requests ops; + ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/log", merge_pred.getVersion())); + ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1)); + ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); + Coordination::Responses responses; + Coordination::Error rc = zookeeper->tryMulti(ops, responses); + + if (rc == Coordination::Error::ZBADVERSION) + { + LOG_TRACE(log, "A new log entry appeared while trying to commit PART_IS_LOST. Retry."); + continue; + } else + zkutil::KeeperMultiException::check(rc, ops, responses); + + String path_created = dynamic_cast(*responses.back()).path_created; + LOG_TRACE(log, "Created PART_IS_LOST entry ({}) for part {}", path_created, part_name); + return true; + } + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot create PART_IS_LOST entry for {}", part_name); +} + +ReplicatedMergeTreeQueue::QueueIters ReplicatedMergeTreeQueue::findEntriesByNewPartName(const String & part_name) const +{ + QueueIters entries; + size_t get_entries_num = 0; + for (auto it = queue.begin(); it != queue.end(); ++it) + { + if ((*it)->new_part_name == part_name && (*it)->type != LogEntry::PART_IS_LOST) + { + entries.push_back(it); + get_entries_num += (*it)->type == LogEntry::GET_PART; + } + } + + if (!entries.empty()) + { + if (entries.size() != 1) + { + /// Replication queue may contain multiple part producing entries with the same new_part_name + /// if replica was recently cloned. At first cloneReplica(...) copies source replica queue, + /// and after that it reads data parts set from source_path + "/parts" + /// to create GET_PART entries in own queue and download missing parts. + /// Therefore, some MERGE_PART (or MUTATE_PART, or even GET_PART) entry + /// may be duplicated with GET_PART with the same new_part_name. + if (1 < entries.size() - get_entries_num) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Found {} queue entries for {}, only {} are GET_PART", + entries.size(), part_name, get_entries_num); + + LOG_WARNING(log, "Found {} queue entries for {}, including {} GET_PART entries. " + "It may rarely happen after replica cloning", entries.size(), part_name, get_entries_num); + } + } + + return entries; +} + +void ReplicatedMergeTreeQueue::executePartIsLost(zkutil::ZooKeeperPtr zookeeper, LogEntry & entry_lost) +{ + LOG_TRACE(log, "Executing {}: PART_IS_LOST for part {} ({})", entry_lost.znode_name, entry_lost.new_part_name); + + /// There might be multiple cancelled parts if replica was cloned... + + size_t queue_size = 0; + std::optional min_unprocessed_insert_time_changed; + std::optional max_processed_insert_time_changed; + Coordination::Requests ops; + { + std::unique_lock lock(state_mutex); + QueueIters cancelled_entries = findEntriesByNewPartName(entry_lost.new_part_name); + + for (const auto entry_it : cancelled_entries) + { + LogEntryPtr canceled_entry; + canceled_entry = *entry_it; + LOG_TRACE(log, "Removing cancelled log entry {}: {}", canceled_entry->znode_name, canceled_entry->toString()); + ops.emplace_back(zkutil::makeRemoveRequest(replica_path + "/queue/" + canceled_entry->znode_name, -1)); + updateStateOnQueueEntryRemoval( + canceled_entry, /* is_successful = */ false, + min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock); + queue.erase(entry_it); + queue_size = queue.size(); + + LOG_TRACE(log, "Waiting for {} to finish", canceled_entry->znode_name); + canceled_entry->execution_complete.wait(lock, [&canceled_entry] + { return !canceled_entry->currently_executing; }); + } + } + + notifySubscribers(queue_size); + + zookeeper->multi(ops); + + updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed); + + { + std::unique_lock lock(state_mutex); + lost_forever_parts.erase(entry_lost.new_part_name); + } + + LOG_TRACE(log, "Executed {}: PART_IS_LOST for part {}", entry_lost.znode_name, entry_lost.new_part_name); +} + +bool ReplicatedMergeTreeMergePredicate::ensurePartIsLost(const String & part_name, Poco::Logger * log) const +{ + String containing_part; + { + std::lock_guard lock(queue.state_mutex); + containing_part = queue.virtual_parts.getContainingPart(part_name); + } + + if (containing_part.empty()) + { + LOG_WARNING(log, "Cannot find lost part {} in virtual parts set. " + "Probably it's already marked as lost by another replica.", part_name); + return false; + } + + if (containing_part != part_name) + { + auto info = MergeTreePartInfo::fromPartName(containing_part, queue.format_version); + if (info.isFakeDropRangePart()) + { + /// It does not matter anymore if part is lost, it should be removed anyway + LOG_INFO(log, "Lost part {} is covered by drop range {}, ignoring it", part_name, containing_part); + return false; + } + + /// Lost part is covered by real part. It means that merging/mutating operation + /// was assigned on lost part by some replica. It probably means that part is not lost. + throw Exception(ErrorCodes::LOGICAL_ERROR, "Lost part {} is covered by real part {}", part_name, containing_part); + } + + /// We can mark part as lost + return true; +} + +bool ReplicatedMergeTreeQueue::removeFailedQuorumPart(const MergeTreePartInfo & part_info) +{ + if (part_info.level != 0) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot remove part with non-zero level: {}", part_info.getPartName()); std::lock_guard lock(state_mutex); return virtual_parts.remove(part_info); } @@ -587,8 +774,6 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper { std::lock_guard state_lock(state_mutex); - log_pointer = last_entry_index + 1; - for (size_t copied_entry_idx = 0, num_copied_entries = copied_entries.size(); copied_entry_idx < num_copied_entries; ++copied_entry_idx) { String path_created = dynamic_cast(*responses[copied_entry_idx]).path_created; @@ -758,9 +943,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C /// Such parts do not exist and will never appear, so we should not add virtual parts to parts_to_do list. /// Fortunately, it's easy to distinguish virtual parts from normal parts by part level. /// See StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(...) - auto max_level = MergeTreePartInfo::MAX_LEVEL; /// DROP/DETACH PARTITION - auto another_max_level = std::numeric_limits::max(); /// REPLACE/MOVE PARTITION - if (part_info.level == max_level || part_info.level == another_max_level) + if (part_info.isFakeDropRangePart()) continue; auto it = entry->block_numbers.find(part_info.partition_id); @@ -941,9 +1124,6 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange( if ((*it)->currently_executing) to_wait.push_back(*it); auto code = zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name); - /// FIXME it's probably unsafe to remove entries non-atomically - /// when this method called directly from alter query (not from replication queue task), - /// because entries will be lost if ALTER fails. if (code != Coordination::Error::ZOK) LOG_INFO(log, "Couldn't remove {}: {}", replica_path + "/queue/" + (*it)->znode_name, Coordination::errorMessage(code)); @@ -1038,11 +1218,26 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( || entry.type == LogEntry::ATTACH_PART || entry.type == LogEntry::MUTATE_PART) { + assert(entry.getBlockingPartNames(format_version).size() == 1); for (const String & new_part_name : entry.getBlockingPartNames(format_version)) { if (!isNotCoveredByFuturePartsImpl(entry.znode_name, new_part_name, out_postpone_reason, state_lock)) return false; } + + String covering_part = virtual_parts.getContainingPart(entry.new_part_name); + if (covering_part.empty()) + { + if (lost_forever_parts.count(entry.new_part_name)) + return false; + throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} not found in virtual parts, but it's not lost. " + "Entry {}: {}\nVirtual: {}\nLost:{}", + entry.new_part_name, entry.znode_name, entry.toString(), + fmt::join(virtual_parts.getParts(), ", "), fmt::join(lost_forever_parts, ", ")); + } + + /// NOTE: It's possible that (covering_part == entry.new_part_name), because isNotCoveredByFuturePartsImpl(...) + /// checks for future parts only, not for virtual parts. } /// Check that fetches pool is not overloaded @@ -1259,7 +1454,8 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const Replicate for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version)) { if (!queue.future_parts.emplace(new_part_name, entry).second) - throw Exception("Tagging already tagged future part " + new_part_name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged future part {}. This is a bug. " + "It happened on attempt to execute {}: {}", new_part_name, entry->znode_name, entry->toString()); } } @@ -1277,7 +1473,8 @@ void ReplicatedMergeTreeQueue::CurrentlyExecuting::setActualPartName(ReplicatedM return; if (!queue.future_parts.emplace(entry.actual_new_part_name, entry.shared_from_this()).second) - throw Exception("Attaching already existing future part " + entry.actual_new_part_name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Attaching already existing future part {}. This is a bug. " + "It happened on attempt to execute {}: {}", entry.actual_new_part_name, entry.znode_name, entry.toString()); } @@ -1296,13 +1493,19 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting() for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version)) { if (!queue.future_parts.erase(new_part_name)) + { LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", new_part_name); + assert(false); + } } if (!entry->actual_new_part_name.empty()) { if (entry->actual_new_part_name != entry->new_part_name && !queue.future_parts.erase(entry->actual_new_part_name)) + { LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", entry->actual_new_part_name); + assert(false); + } entry->actual_new_part_name.clear(); } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 496f277d132..c1470e8ff01 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -92,8 +92,8 @@ private: using FuturePartsSet = std::map; FuturePartsSet future_parts; - /// Index of the first log entry that we didn't see yet. - Int64 log_pointer = 0; + using LostPartsSet = std::set; + LostPartsSet lost_forever_parts; /// Avoid parallel execution of queue enties, which may remove other entries from the queue. bool currently_executing_drop_or_replace_range = false; @@ -275,6 +275,7 @@ public: ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_, ReplicatedMergeTreeMergeStrategyPicker & merge_strategy_picker_); ~ReplicatedMergeTreeQueue(); + void clear(); void initialize(const MergeTreeData::DataParts & parts); @@ -289,13 +290,20 @@ public: */ bool remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name); + using QueueIters = std::vector; + QueueIters findEntriesByNewPartName(const String & part_name) const; + + bool markPartAsLostForever(zkutil::ZooKeeperPtr zookeeper, const String & part_name); + + void executePartIsLost(zkutil::ZooKeeperPtr zookeeper, LogEntry & entry_lost); + /** Load (initialize) a queue from ZooKeeper (/replicas/me/queue/). * If queue was not empty load() would not load duplicate records. * return true, if we update queue. */ bool load(zkutil::ZooKeeperPtr zookeeper); - bool removeFromVirtualParts(const MergeTreePartInfo & part_info); + bool removeFailedQuorumPart(const MergeTreePartInfo & part_info); /** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value. * If watch_callback is not empty, will call it when new entries appear in the log. @@ -478,6 +486,8 @@ public: /// The version of "log" node that is used to check that no new merges have appeared. int32_t getVersion() const { return merges_version; } + bool ensurePartIsLost(const String & part_name, Poco::Logger * log) const; + private: const ReplicatedMergeTreeQueue & queue;