diff --git a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index d170ba835cb..92c120d6d9f 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -190,11 +190,9 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPartAndFetchIfPossible( if (missing_part_search_result == MissingPartSearchResult::LostForever) { - /// Is it in the replication queue? If there is - delete, because the task can not be processed. - if (!storage.queue.remove(zookeeper, part_name)) + if (!storage.createEmptyPartInsteadOfLost(part_name)) { - /// The part was not in our queue. - LOG_WARNING(log, "Missing part {} is not in our queue, this can happen rarely.", part_name); + LOG_WARNING(log, "Cannot create empty part {} instead of lost. Will retry later", part_name); } /** This situation is possible if on all the replicas where the part was, it deteriorated. diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index a14b6119f38..314605d5c9f 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -410,62 +410,6 @@ void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeep updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed); } - -bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name) -{ - LogEntryPtr found; - size_t queue_size = 0; - - std::optional min_unprocessed_insert_time_changed; - std::optional max_processed_insert_time_changed; - - { - std::unique_lock lock(state_mutex); - - bool removed = virtual_parts.remove(part_name); - - for (Queue::iterator it = queue.begin(); it != queue.end();) - { - if ((*it)->new_part_name == part_name) - { - found = *it; - if (removed) - { - /// Preserve invariant `virtual_parts` = `current_parts` + `queue`. - /// We remove new_part from virtual parts and add all source parts - /// which present in current_parts. - for (const auto & source_part : found->source_parts) - { - auto part_in_current_parts = current_parts.getContainingPart(source_part); - if (part_in_current_parts == source_part) - virtual_parts.add(source_part, nullptr, log); - } - } - - updateStateOnQueueEntryRemoval( - found, /* is_successful = */ false, - min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock); - queue.erase(it++); - queue_size = queue.size(); - break; - } - else - ++it; - } - } - - if (!found) - return false; - - notifySubscribers(queue_size); - - zookeeper->tryRemove(fs::path(replica_path) / "queue" / found->znode_name); - updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed); - - return true; -} - - bool ReplicatedMergeTreeQueue::removeFailedQuorumPart(const MergeTreePartInfo & part_info) { assert(part_info.level == 0); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 078795472bf..439536cf461 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -281,11 +281,6 @@ public: */ void insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPtr & entry); - /** Delete the action with the specified part (as new_part_name) from the queue. - * Called for unreachable actions in the queue - old lost parts. - */ - bool remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name); - /** Load (initialize) a queue from ZooKeeper (/replicas/me/queue/). * If queue was not empty load() would not load duplicate records. * return true, if we update queue. diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 465aa7e42bc..5049b364dc6 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -7385,4 +7386,100 @@ bool StorageReplicatedMergeTree::checkIfDetachedPartitionExists(const String & p } return false; } + + +bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(const String & lost_part_name) +{ + LOG_INFO(log, "Going to replace lost part {} with empty part", lost_part_name); + auto metadata_snapshot = getInMemoryMetadataPtr(); + auto storage_settings = getSettings(); + + constexpr static auto TMP_PREFIX = "tmp_empty_"; + + auto new_part_info = MergeTreePartInfo::fromPartName(lost_part_name, format_version); + auto block = metadata_snapshot->getSampleBlock(); + + DB::IMergeTreeDataPart::TTLInfos move_ttl_infos; + + NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames()); + ReservationPtr reservation = reserveSpacePreferringTTLRules(metadata_snapshot, 0, move_ttl_infos, time(nullptr), 0, true); + VolumePtr volume = getStoragePolicy()->getVolume(0); + + IMergeTreeDataPart::MinMaxIndex minmax_idx; + minmax_idx.update(block, getMinMaxColumnsNames(metadata_snapshot->getPartitionKey())); + + auto new_data_part = createPart( + lost_part_name, + choosePartType(0, block.rows()), + new_part_info, + createVolumeFromReservation(reservation, volume), + TMP_PREFIX + lost_part_name); + + if (storage_settings->assign_part_uuids) + new_data_part->uuid = UUIDHelpers::generateV4(); + + new_data_part->setColumns(columns); + new_data_part->rows_count = block.rows(); + + auto parts_in_partition = getDataPartsPartitionRange(new_part_info.partition_id); + if (parts_in_partition.empty()) + { + LOG_WARNING(log, "Empty part {} is not created instead of lost part because there is no parts in partition {}, just DROP this partition.", lost_part_name, new_part_info.partition_id); + return false; + } + + new_data_part->partition = (*parts_in_partition.begin())->partition; + new_data_part->minmax_idx = std::move(minmax_idx); + new_data_part->is_temp = true; + + + SyncGuardPtr sync_guard; + if (new_data_part->isStoredOnDisk()) + { + /// The name could be non-unique in case of stale files from previous runs. + String full_path = new_data_part->getFullRelativePath(); + + if (new_data_part->volume->getDisk()->exists(full_path)) + { + LOG_WARNING(log, "Removing old temporary directory {}", fullPath(new_data_part->volume->getDisk(), full_path)); + new_data_part->volume->getDisk()->removeRecursive(full_path); + } + + const auto disk = new_data_part->volume->getDisk(); + disk->createDirectories(full_path); + + if (getSettings()->fsync_part_directory) + sync_guard = disk->getDirectorySyncGuard(full_path); + } + + /// This effectively chooses minimal compression method: + /// either default lz4 or compression method with zero thresholds on absolute and relative part size. + auto compression_codec = getContext()->chooseCompressionCodec(0, 0); + + const auto & index_factory = MergeTreeIndexFactory::instance(); + MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec); + bool sync_on_insert = storage_settings->fsync_after_insert; + + out.writePrefix(); + out.write(block); + out.writeSuffixAndFinalizePart(new_data_part, sync_on_insert); + + try + { + MergeTreeData::Transaction transaction(*this); + renameTempPartAndReplace(new_data_part, nullptr, &transaction); + + checkPartChecksumsAndCommit(transaction, new_data_part); + } + catch (const Exception & ex) + { + LOG_WARNING(log, "Cannot commit empty part {} with error {}", lost_part_name, ex.displayText()); + return false; + } + + LOG_INFO(log, "Created empty part {} instead of lost part", lost_part_name); + + return true; +} + } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 205dc9687c7..6a3d0b2681b 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -258,6 +258,8 @@ public: return replicated_sends_throttler; } + bool createEmptyPartInsteadOfLost(const String & lost_part_name); + private: std::atomic_bool are_restoring_replica {false};