fix terminate in parts check thread

This commit is contained in:
Alexander Tokmakov 2023-03-22 23:46:15 +01:00
parent 0989dd837a
commit e3c798bae5

View File

@ -76,50 +76,49 @@ std::unique_lock<std::mutex> ReplicatedMergeTreePartCheckThread::pausePartsCheck
void ReplicatedMergeTreePartCheckThread::cancelRemovedPartsCheck(const MergeTreePartInfo & drop_range_info)
{
Strings removed_names;
Strings parts_to_remove;
{
std::lock_guard lock(parts_mutex);
removed_names.reserve(parts_queue.size()); /// Avoid memory limit in the middle
for (auto it = parts_queue.begin(); it != parts_queue.end();)
{
if (drop_range_info.contains(MergeTreePartInfo::fromPartName(it->first, storage.format_version)))
{
/// Remove part from the queue to avoid part resurrection
/// if we will check it and enqueue fetch after DROP/REPLACE execution.
removed_names.push_back(it->first);
parts_set.erase(it->first);
it = parts_queue.erase(it);
}
else
{
++it;
}
}
for (const auto & elem : parts_queue)
if (drop_range_info.contains(MergeTreePartInfo::fromPartName(elem.first, storage.format_version)))
parts_to_remove.push_back(elem.first);
}
/// This filtering is not necessary
auto new_end = std::remove_if(removed_names.begin(), removed_names.end(), [this](const String & part_name)
/// We have to remove parts that were not removed by removePartAndEnqueueFetch
LOG_INFO(log, "Removing broken parts from ZooKeeper: {}", fmt::join(parts_to_remove, ", "));
storage.removePartsFromZooKeeperWithRetries(parts_to_remove); /// May throw
/// Now we can remove parts from the check queue.
/// It's not atomic (because it's bad idea to hold the mutex while removing something from zk with retries),
/// but the check thread is currently paused, and no new parts in drop_range_info can by enqueued
/// while the corresponding DROP_RANGE/REPLACE_RANGE exists, so it should be okay. We will recheck it just in case.
StringSet removed_parts;
for (auto & part : parts_to_remove)
removed_parts.emplace(std::move(part));
size_t count = 0;
std::lock_guard lock(parts_mutex);
for (const auto & elem : parts_queue)
{
auto part = storage.getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated, MergeTreeDataPartState::Deleting});
/// The rest of parts will be removed normally
return part && !part->outdated_because_broken;
bool is_removed = removed_parts.contains(elem.first);
bool should_have_been_removed = drop_range_info.contains(MergeTreePartInfo::fromPartName(elem.first, storage.format_version));
if (is_removed != should_have_been_removed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent parts_queue: name={}, is_removed={}, should_have_been_removed={}",
elem.first, is_removed, should_have_been_removed);
count += is_removed;
}
if (count != parts_to_remove.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected number of parts to remove from parts_queue: should be {}, got {}",
parts_to_remove.size(), count);
auto new_end = std::remove_if(parts_queue.begin(), parts_queue.end(), [&removed_parts] (const auto & elem)
{
return removed_parts.contains(elem.first);
});
removed_names.erase(new_end, removed_names.end());
if (removed_names.empty())
return;
try
{
/// We have to remove parts that were not removed by removePartAndEnqueueFetch
LOG_INFO(log, "Removing broken parts from ZooKeeper: {}", fmt::join(removed_names, ", "));
storage.removePartsFromZooKeeperWithRetries(removed_names, /* max_retries */ 100);
}
catch (...)
{
/// It's highly unlikely to happen on normal use cases. And if it happens it's easier to restart and reinitialize
LOG_FATAL(log, "Failed to remove parts [{}] from ZooKeeper: {}", fmt::join(removed_names, ", "), getCurrentExceptionMessage(/* with_stacktrace = */ true));
std::terminate();
}
parts_queue.erase(new_end, parts_queue.end());
}
size_t ReplicatedMergeTreePartCheckThread::size() const