Review fixes

This commit is contained in:
alesapin 2021-06-30 18:24:51 +03:00
parent 19aeb14dfd
commit 6a73c8b49e
5 changed files with 86 additions and 7 deletions

View File

@ -190,11 +190,25 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPartAndFetchIfPossible(
if (missing_part_search_result == MissingPartSearchResult::LostForever)
{
if (!storage.createEmptyPartInsteadOfLost(part_name))
auto lost_part_info = MergeTreePartInfo::fromPartName(part_name, storage.format_version);
if (lost_part_info.level != 0)
{
LOG_WARNING(log, "Cannot create empty part {} instead of lost. Will retry later", part_name);
Strings source_parts;
bool part_in_queue = storage.queue.checkPartInQueueAndGetSourceParts(part_name, source_parts);
/// If it's MERGE/MUTATION etc. we shouldn't replace result part with empty part
/// because some source parts can be lost, but some of them can exist.
if (part_in_queue && !source_parts.empty())
{
LOG_ERROR(log, "Part {} found in queue and some source parts for it was lost. Will check all source parts.", part_name);
for (const String & source_part_name : source_parts)
enqueuePart(source_part_name);
return;
}
}
else
if (storage.createEmptyPartInsteadOfLost(zookeeper, part_name))
{
/** This situation is possible if on all the replicas where the part was, it deteriorated.
* For example, a replica that has just written it has power turned off and the data has not been written from cache to disk.
@ -202,6 +216,10 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPartAndFetchIfPossible(
LOG_ERROR(log, "Part {} is lost forever.", part_name);
ProfileEvents::increment(ProfileEvents::ReplicatedDataLoss);
}
else
{
LOG_WARNING(log, "Cannot create empty part {} instead of lost. Will retry later", part_name);
}
}
}

View File

@ -64,6 +64,22 @@ bool ReplicatedMergeTreeQueue::isVirtualPart(const MergeTreeData::DataPartPtr &
return !virtual_part_name.empty() && virtual_part_name != data_part->name;
}
bool ReplicatedMergeTreeQueue::checkPartInQueueAndGetSourceParts(const String & part_name, Strings & source_parts) const
{
std::lock_guard lock(state_mutex);
for (auto it = queue.begin(); it != queue.end(); ++it)
{
if ((*it)->new_part_name == part_name)
{
source_parts.insert(source_parts.end(), (*it)->source_parts.begin(), (*it)->source_parts.end());
return true;
}
}
return false;
}
bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
{

View File

@ -373,6 +373,9 @@ public:
/// Checks that part is already in virtual parts
bool isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const;
/// Check that part produced by some entry in queue and get source parts for it
bool checkPartInQueueAndGetSourceParts(const String & part_name, Strings & source_parts) const;
/// Check that part isn't in currently generating parts and isn't covered by them and add it to future_parts.
/// Locks queue's mutex.
bool addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason);

View File

@ -1281,7 +1281,6 @@ void StorageReplicatedMergeTree::syncPinnedPartUUIDs()
}
}
void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil::ZooKeeperPtr & zookeeper,
const DataPartPtr & part, Coordination::Requests & ops, String part_name, NameSet * absent_replicas_paths)
{
@ -7396,7 +7395,7 @@ bool StorageReplicatedMergeTree::checkIfDetachedPartitionExists(const String & p
}
bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(const String & lost_part_name)
bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name)
{
LOG_INFO(log, "Going to replace lost part {} with empty part", lost_part_name);
auto metadata_snapshot = getInMemoryMetadataPtr();
@ -7477,7 +7476,50 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(const String & los
MergeTreeData::Transaction transaction(*this);
renameTempPartAndReplace(new_data_part, nullptr, &transaction);
checkPartChecksumsAndCommit(transaction, new_data_part);
while (true)
{
Coordination::Requests ops;
Coordination::Stat replicas_stat;
auto replicas_path = fs::path(zookeeper_path) / "replicas";
Strings replicas = zookeeper->getChildren(replicas_path, &replicas_stat);
/// In rare cases new replica can appear during check
ops.emplace_back(zkutil::makeCheckRequest(replicas_path, replicas_stat.version));
for (const String & replica : replicas)
{
String current_part_path = fs::path(zookeeper_path) / "replicas" / replica / "parts" / lost_part_name;
/// We must be sure that this part doesn't exist on other replicas
if (!zookeeper->exists(current_part_path))
{
ops.emplace_back(zkutil::makeCreateRequest(current_part_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(current_part_path, -1));
}
else
{
throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Part {} already exists on replica {} on path {}", lost_part_name, replica, current_part_path);
}
}
getCommitPartOps(ops, new_data_part);
Coordination::Responses responses;
if (auto code = zookeeper->tryMulti(ops, responses); code == Coordination::Error::ZOK)
{
transaction.commit();
break;
}
else if (code == Coordination::Error::ZBADVERSION)
{
LOG_INFO(log, "Looks like new replica appearead while creating new empty part, will retry");
}
else
{
zkutil::KeeperMultiException::check(code, ops, responses);
}
}
}
catch (const Exception & ex)
{

View File

@ -258,7 +258,7 @@ public:
return replicated_sends_throttler;
}
bool createEmptyPartInsteadOfLost(const String & lost_part_name);
bool createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name);
private:
std::atomic_bool are_restoring_replica {false};