From 04875a022a031ac763df2f31cec0549cb5dfd79c Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 29 Mar 2016 22:39:44 +0300 Subject: [PATCH] Removed code for freezing partitions due to unacceptable implementation [#METR-20656]. --- .../Storages/MergeTree/MergeTreeDataMerger.h | 7 --- .../DB/Storages/MergeTree/ReshardingWorker.h | 10 ---- .../MergeTree/MergeTreeDataMerger.cpp | 52 ------------------- .../Storages/MergeTree/ReshardingWorker.cpp | 30 +---------- .../Storages/StorageReplicatedMergeTree.cpp | 8 --- 5 files changed, 2 insertions(+), 105 deletions(-) diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h index 1a6a8a881cd..b8319fad14d 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeDataMerger.h @@ -63,9 +63,6 @@ public: /// Примерное количество места на диске, нужное для мерджа. С запасом. static size_t estimateDiskSpaceForMerge(const MergeTreeData::DataPartsVector & parts); - void freezePartition(const std::string & partition); - void unfreezePartition(const std::string & partition); - /** Отменяет все мерджи. Все выполняющиеся сейчас вызовы mergeParts скоро бросят исключение. * Все новые вызовы будут бросать исключения, пока не будет вызван uncancel(). */ @@ -79,7 +76,6 @@ private: /** Выбрать все куски принадлежащие одной партиции. */ MergeTreeData::DataPartsVector selectAllPartsFromPartition(DayNum_t partition); - bool isPartitionFrozen(const MergeTreeData::DataPart & part) const; private: using FrozenPartitions = std::unordered_set; @@ -92,9 +88,6 @@ private: /// Когда в последний раз писали в лог, что место на диске кончилось (чтобы не писать об этом слишком часто). time_t disk_space_warning_time = 0; - std::mutex freeze_lock; - FrozenPartitions frozen_partitions; - CancellationHook cancellation_hook; std::atomic cancelled {false}; diff --git a/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h b/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h index c1c3f45940a..8ae6fe2de1c 100644 --- a/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h +++ b/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h @@ -294,16 +294,6 @@ private: zkutil::SingleBarrier getElectionBarrier(); zkutil::SingleBarrier getCommitBarrier(); - /// Prevent merging jobs from being performed on the partition that we - /// want to reshard on the current host. This operation is persistent: - /// even if a node failure occurred, a partition remains frozen as long - /// as it is not unfrozen explicitely. So use it with care as regards - /// exceptions. - void freezeSourcePartition(); - - /// Make the partition that we want to reshard available for merging jobs. - void unfreezeSourcePartition(); - /// Get the ZooKeeper path of a given coordinator. std::string getCoordinatorPath(const std::string & coordinator_id) const; /// Get the ZooKeeper path of a given job partition. diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index ac295b676fe..a4ba04f6f08 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -83,19 +83,6 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa { MergeTreeData::DataParts data_parts = data.getDataParts(); - { - std::lock_guard guard{freeze_lock}; - - for (auto it = data_parts.begin(); it != data_parts.end(); ) - { - const MergeTreeData::DataPartPtr & part = *it; - if (isPartitionFrozen(*part)) - it = data_parts.erase(it); - else - ++it; - } - } - if (data_parts.empty()) return false; @@ -351,28 +338,6 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts( size_t aio_threshold, MergeTreeData::Transaction * out_transaction, DiskSpaceMonitor::Reservation * disk_reservation) { - std::lock_guard guard{freeze_lock}; - - if (!frozen_partitions.empty()) - { - size_t old_parts_size = parts.size(); - - auto first_removed = std::remove_if(parts.begin(), parts.end(), [&](const MergeTreeData::DataPartPtr & part) - { - return isPartitionFrozen(*part); - }); - parts.erase(first_removed, parts.end()); - - if (parts.empty()) - throw Exception("All the chosen parts lie inside a frozen partition. Cancelling.", ErrorCodes::ABORTED); - - if (disk_reservation && (parts.size() != old_parts_size)) - { - size_t sum_parts_size_in_bytes = MergeTreeDataMerger::estimateDiskSpaceForMerge(parts); - disk_reservation = DiskSpaceMonitor::reserve(data.getFullPath(), sum_parts_size_in_bytes); - } - } - merge_entry->num_parts = parts.size(); LOG_DEBUG(log, "Merging " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name << " into " << merged_name); @@ -821,23 +786,6 @@ size_t MergeTreeDataMerger::estimateDiskSpaceForMerge(const MergeTreeData::DataP return static_cast(res * DISK_USAGE_COEFFICIENT_TO_RESERVE); } -void MergeTreeDataMerger::freezePartition(const std::string & partition) -{ - std::lock_guard guard{freeze_lock}; - frozen_partitions.insert(MergeTreeData::getMonthFromName(partition)); -} - -void MergeTreeDataMerger::unfreezePartition(const std::string & partition) -{ - std::lock_guard guard{freeze_lock}; - frozen_partitions.erase(MergeTreeData::getMonthFromName(partition)); -} - -bool MergeTreeDataMerger::isPartitionFrozen(const MergeTreeData::DataPart & part) const -{ - return frozen_partitions.count(part.month); -} - void MergeTreeDataMerger::abortIfRequested() { if (cancelled) diff --git a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp index cb08114bfa4..a3c410b6693 100644 --- a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp +++ b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp @@ -593,9 +593,6 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st auto & storage = typeid_cast(*(generic_storage.get())); current_job.storage = &storage; - /// Protect the source partition from merging jobs. - freezeSourcePartition(); - std::string dumped_coordinator_state; auto handle_exception = [&](const std::string & cancel_msg, const std::string & error_msg) @@ -623,7 +620,6 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id); } deletion_lock.release(); - unfreezeSourcePartition(); hardCleanup(); } } @@ -715,11 +711,11 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st { dumped_coordinator_state = dumpCoordinatorState(current_job.coordinator_id); deletion_lock.release(); - unfreezeSourcePartition(); hardCleanup(); } else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED) - unfreezeSourcePartition(); + { + } else if (ex.code() == ErrorCodes::RESHARDING_DISTRIBUTED_JOB_ON_HOLD) { /// The current distributed job is on hold and one or more required performers @@ -757,9 +753,6 @@ void ReshardingWorker::perform(const std::string & job_descriptor, const std::st } deletion_lock.release(); - /// Although the source partition has been dropped, the following function - /// must be called in order to delete all the data that makes freezing fail-safe. - unfreezeSourcePartition(); hardCleanup(); LOG_DEBUG(log, "Resharding job successfully completed."); @@ -2505,25 +2498,6 @@ zkutil::SingleBarrier ReshardingWorker::getCommitBarrier() return commit_barrier; } -void ReshardingWorker::freezeSourcePartition() -{ - auto zookeeper = context.getZooKeeper(); - auto & storage = *(current_job.storage); - - zookeeper->createIfNotExists(storage.replica_path + "/frozen_partitions/" - + current_job.partition, ""); - storage.merger.freezePartition(current_job.partition); -} - -void ReshardingWorker::unfreezeSourcePartition() -{ - auto zookeeper = context.getZooKeeper(); - auto & storage = *(current_job.storage); - - zookeeper->remove(storage.replica_path + "/frozen_partitions/" + current_job.partition); - storage.merger.unfreezePartition(current_job.partition); -} - std::string ReshardingWorker::getCoordinatorPath(const std::string & coordinator_id) const { return coordination_path + "/" + coordinator_id; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index a69714fe7b1..10cd9308345 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -293,11 +293,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( queue.pullLogsToQueue(current_zookeeper, nullptr); - /// Замораживаем партиции, которые прежне были в этом состоянии. - auto frozen_partitions = current_zookeeper->getChildren(replica_path + "/frozen_partitions"); - for (const auto & partition : frozen_partitions) - merger.freezePartition(partition); - /// В этом потоке реплика будет активирована. restarting_thread.reset(new ReplicatedMergeTreeRestartingThread(*this)); } @@ -315,9 +310,6 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes() /// Отслеживание отставания реплик. zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", ""); zookeeper->createIfNotExists(replica_path + "/max_processed_insert_time", ""); - - /// Партиции, для которых временно не проводятся слияния. - zookeeper->createIfNotExists(replica_path + "/frozen_partitions", ""); }