From 6a73c8b49ef59bca82d7e39e57fde16e0d226b2a Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 30 Jun 2021 18:24:51 +0300 Subject: [PATCH] Review fixes --- .../ReplicatedMergeTreePartCheckThread.cpp | 24 ++++++++-- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 16 +++++++ .../MergeTree/ReplicatedMergeTreeQueue.h | 3 ++ src/Storages/StorageReplicatedMergeTree.cpp | 48 +++++++++++++++++-- src/Storages/StorageReplicatedMergeTree.h | 2 +- 5 files changed, 86 insertions(+), 7 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index d0aaebc0667..35a011a4a58 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -190,11 +190,25 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPartAndFetchIfPossible( if (missing_part_search_result == MissingPartSearchResult::LostForever) { - if (!storage.createEmptyPartInsteadOfLost(part_name)) + auto lost_part_info = MergeTreePartInfo::fromPartName(part_name, storage.format_version); + if (lost_part_info.level != 0) { - LOG_WARNING(log, "Cannot create empty part {} instead of lost. Will retry later", part_name); + Strings source_parts; + bool part_in_queue = storage.queue.checkPartInQueueAndGetSourceParts(part_name, source_parts); + + /// If it's MERGE/MUTATION etc. we shouldn't replace result part with empty part + /// because some source parts can be lost, but some of them can exist. + if (part_in_queue && !source_parts.empty()) + { + LOG_ERROR(log, "Part {} found in queue and some source parts for it was lost. Will check all source parts.", part_name); + for (const String & source_part_name : source_parts) + enqueuePart(source_part_name); + + return; + } } - else + + if (storage.createEmptyPartInsteadOfLost(zookeeper, part_name)) { /** This situation is possible if on all the replicas where the part was, it deteriorated. * For example, a replica that has just written it has power turned off and the data has not been written from cache to disk. @@ -202,6 +216,10 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPartAndFetchIfPossible( LOG_ERROR(log, "Part {} is lost forever.", part_name); ProfileEvents::increment(ProfileEvents::ReplicatedDataLoss); } + else + { + LOG_WARNING(log, "Cannot create empty part {} instead of lost. Will retry later", part_name); + } } } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 314605d5c9f..1b0e4ff226a 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -64,6 +64,22 @@ bool ReplicatedMergeTreeQueue::isVirtualPart(const MergeTreeData::DataPartPtr & return !virtual_part_name.empty() && virtual_part_name != data_part->name; } +bool ReplicatedMergeTreeQueue::checkPartInQueueAndGetSourceParts(const String & part_name, Strings & source_parts) const +{ + std::lock_guard lock(state_mutex); + + for (auto it = queue.begin(); it != queue.end(); ++it) + { + if ((*it)->new_part_name == part_name) + { + source_parts.insert(source_parts.end(), (*it)->source_parts.begin(), (*it)->source_parts.end()); + return true; + } + } + + return false; +} + bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper) { diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 439536cf461..0a2c092dfdb 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -373,6 +373,9 @@ public: /// Checks that part is already in virtual parts bool isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const; + /// Check that part produced by some entry in queue and get source parts for it + bool checkPartInQueueAndGetSourceParts(const String & part_name, Strings & source_parts) const; + /// Check that part isn't in currently generating parts and isn't covered by them and add it to future_parts. /// Locks queue's mutex. bool addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index e9fb3b8ad0e..a04ec5c6532 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1281,7 +1281,6 @@ void StorageReplicatedMergeTree::syncPinnedPartUUIDs() } } - void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil::ZooKeeperPtr & zookeeper, const DataPartPtr & part, Coordination::Requests & ops, String part_name, NameSet * absent_replicas_paths) { @@ -7396,7 +7395,7 @@ bool StorageReplicatedMergeTree::checkIfDetachedPartitionExists(const String & p } -bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(const String & lost_part_name) +bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name) { LOG_INFO(log, "Going to replace lost part {} with empty part", lost_part_name); auto metadata_snapshot = getInMemoryMetadataPtr(); @@ -7477,7 +7476,50 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(const String & los MergeTreeData::Transaction transaction(*this); renameTempPartAndReplace(new_data_part, nullptr, &transaction); - checkPartChecksumsAndCommit(transaction, new_data_part); + while (true) + { + + Coordination::Requests ops; + Coordination::Stat replicas_stat; + auto replicas_path = fs::path(zookeeper_path) / "replicas"; + Strings replicas = zookeeper->getChildren(replicas_path, &replicas_stat); + + /// In rare cases new replica can appear during check + ops.emplace_back(zkutil::makeCheckRequest(replicas_path, replicas_stat.version)); + + for (const String & replica : replicas) + { + String current_part_path = fs::path(zookeeper_path) / "replicas" / replica / "parts" / lost_part_name; + + /// We must be sure that this part doesn't exist on other replicas + if (!zookeeper->exists(current_part_path)) + { + ops.emplace_back(zkutil::makeCreateRequest(current_part_path, "", zkutil::CreateMode::Persistent)); + ops.emplace_back(zkutil::makeRemoveRequest(current_part_path, -1)); + } + else + { + throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Part {} already exists on replica {} on path {}", lost_part_name, replica, current_part_path); + } + } + + getCommitPartOps(ops, new_data_part); + + Coordination::Responses responses; + if (auto code = zookeeper->tryMulti(ops, responses); code == Coordination::Error::ZOK) + { + transaction.commit(); + break; + } + else if (code == Coordination::Error::ZBADVERSION) + { + LOG_INFO(log, "Looks like new replica appearead while creating new empty part, will retry"); + } + else + { + zkutil::KeeperMultiException::check(code, ops, responses); + } + } } catch (const Exception & ex) { diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 6a3d0b2681b..6f717b7c450 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -258,7 +258,7 @@ public: return replicated_sends_throttler; } - bool createEmptyPartInsteadOfLost(const String & lost_part_name); + bool createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name); private: std::atomic_bool are_restoring_replica {false};