Fix rare race which can lead to queue hang

This commit is contained in:
alesapin 2023-01-02 16:57:25 +01:00
parent f7fbaf9c99
commit 6fc63878b4
2 changed files with 53 additions and 13 deletions

View File

@ -179,9 +179,14 @@ ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreeP
found_part_with_the_same_min_block = true;
parts_found.push_back(part_on_replica);
}
if (part_on_replica_info.max_block == part_info.max_block)
{
found_part_with_the_same_max_block = true;
/// If we are looking for part like partition_X_X_level we can add part
/// partition_X_X_(level-1) two times, avoiding it
if (parts_found.empty() || parts_found.back() != part_on_replica)
parts_found.push_back(part_on_replica);
}

View File

@ -3498,21 +3498,55 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
removePartFromZooKeeper(part_name, ops, stat.numChildren > 0);
}
while (true)
{
/// We use merge predicate + version check here, because DROP RANGE update log version and we are trying to avoid race with it. We must be sure, that our part
/// was not dropped, otherwise we will have fetch entry, but no virtual part for it (DROP RANGE will remove it). So bad sequence is the following:
/// 1) Create DROP PART in log for broken_part (for example because it's not only broken, but also empty)
/// 1.5) Pull to log and start execution (removePartProducingOpsInRange)
/// 2) Execute this method (create GET PART for broken part)
/// 3) Finish DROP PART execution (remove part from virtual parts)
/// 4) Now we get GET PART for broken part which is absent in virtual parts == bug
///
/// Because of version check this method will never create FETCH if drop part exists
ReplicatedMergeTreeMergePredicate merge_pred = queue.getMergePredicate(zookeeper, PartitionIdsHint{broken_part_info.partition_id});
if (merge_pred.hasDropRange(broken_part_info))
{
LOG_INFO(log, "Broken part is covered by DROP RANGE, don't need to fetch it");
return;
}
LogEntryPtr log_entry = std::make_shared<LogEntry>();
log_entry->type = LogEntry::GET_PART;
log_entry->create_time = part_create_time;
log_entry->source_replica = "";
log_entry->new_part_name = part_name;
/// Check that our version of log (and queue) is the most fresh. Otherwise don't create new entry fetch entry.
ops.emplace_back(zkutil::makeCheckRequest(fs::path(zookeeper_path) / "log", merge_pred.getVersion()));
ops.emplace_back(zkutil::makeCreateRequest(
fs::path(replica_path) / "queue/queue-", log_entry->toString(),
zkutil::CreateMode::PersistentSequential));
auto results = zookeeper->multi(ops);
Coordination::Responses results;
auto rc = zookeeper->tryMulti(ops, results);
if (rc == Coordination::Error::ZBADVERSION)
{
LOG_TRACE(log, "Log updated, cannot create fetch entry for part {}, will retry.", part_name);
continue;
}
else
zkutil::KeeperMultiException::check(rc, ops, results);
String path_created = dynamic_cast<const Coordination::CreateResponse &>(*results.back()).path_created;
log_entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1);
queue.insert(zookeeper, log_entry);
break;
}
}
@ -7448,6 +7482,7 @@ bool StorageReplicatedMergeTree::dropPartImpl(
String log_znode_path = dynamic_cast<const Coordination::CreateResponse &>(*responses[clear_block_ops_size + 1]).path_created;
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
LOG_TRACE(log, "DROP RANGE for part {} inserted with znode name {}", part_name, entry.znode_name);
return true;
}
}