From e3c798bae5f204208048f336fa842daad65404eb Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Wed, 22 Mar 2023 23:46:15 +0100 Subject: [PATCH] fix terminate in parts check thread --- .../ReplicatedMergeTreePartCheckThread.cpp | 73 +++++++++---------- 1 file changed, 36 insertions(+), 37 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index 8eafc54cb4c..27ab56d4773 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -76,50 +76,49 @@ std::unique_lock ReplicatedMergeTreePartCheckThread::pausePartsCheck void ReplicatedMergeTreePartCheckThread::cancelRemovedPartsCheck(const MergeTreePartInfo & drop_range_info) { - Strings removed_names; + Strings parts_to_remove; { std::lock_guard lock(parts_mutex); - removed_names.reserve(parts_queue.size()); /// Avoid memory limit in the middle - for (auto it = parts_queue.begin(); it != parts_queue.end();) - { - if (drop_range_info.contains(MergeTreePartInfo::fromPartName(it->first, storage.format_version))) - { - /// Remove part from the queue to avoid part resurrection - /// if we will check it and enqueue fetch after DROP/REPLACE execution. - removed_names.push_back(it->first); - parts_set.erase(it->first); - it = parts_queue.erase(it); - } - else - { - ++it; - } - } + for (const auto & elem : parts_queue) + if (drop_range_info.contains(MergeTreePartInfo::fromPartName(elem.first, storage.format_version))) + parts_to_remove.push_back(elem.first); } - /// This filtering is not necessary - auto new_end = std::remove_if(removed_names.begin(), removed_names.end(), [this](const String & part_name) + /// We have to remove parts that were not removed by removePartAndEnqueueFetch + LOG_INFO(log, "Removing broken parts from ZooKeeper: {}", fmt::join(parts_to_remove, ", ")); + storage.removePartsFromZooKeeperWithRetries(parts_to_remove); /// May throw + + /// Now we can remove parts from the check queue. + /// It's not atomic (because it's bad idea to hold the mutex while removing something from zk with retries), + /// but the check thread is currently paused, and no new parts in drop_range_info can by enqueued + /// while the corresponding DROP_RANGE/REPLACE_RANGE exists, so it should be okay. We will recheck it just in case. + + StringSet removed_parts; + for (auto & part : parts_to_remove) + removed_parts.emplace(std::move(part)); + size_t count = 0; + + std::lock_guard lock(parts_mutex); + for (const auto & elem : parts_queue) { - auto part = storage.getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated, MergeTreeDataPartState::Deleting}); - /// The rest of parts will be removed normally - return part && !part->outdated_because_broken; + bool is_removed = removed_parts.contains(elem.first); + bool should_have_been_removed = drop_range_info.contains(MergeTreePartInfo::fromPartName(elem.first, storage.format_version)); + if (is_removed != should_have_been_removed) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent parts_queue: name={}, is_removed={}, should_have_been_removed={}", + elem.first, is_removed, should_have_been_removed); + count += is_removed; + } + + if (count != parts_to_remove.size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected number of parts to remove from parts_queue: should be {}, got {}", + parts_to_remove.size(), count); + + auto new_end = std::remove_if(parts_queue.begin(), parts_queue.end(), [&removed_parts] (const auto & elem) + { + return removed_parts.contains(elem.first); }); - removed_names.erase(new_end, removed_names.end()); - if (removed_names.empty()) - return; - try - { - /// We have to remove parts that were not removed by removePartAndEnqueueFetch - LOG_INFO(log, "Removing broken parts from ZooKeeper: {}", fmt::join(removed_names, ", ")); - storage.removePartsFromZooKeeperWithRetries(removed_names, /* max_retries */ 100); - } - catch (...) - { - /// It's highly unlikely to happen on normal use cases. And if it happens it's easier to restart and reinitialize - LOG_FATAL(log, "Failed to remove parts [{}] from ZooKeeper: {}", fmt::join(removed_names, ", "), getCurrentExceptionMessage(/* with_stacktrace = */ true)); - std::terminate(); - } + parts_queue.erase(new_end, parts_queue.end()); } size_t ReplicatedMergeTreePartCheckThread::size() const