From 76156af5cc30b8706e2b9527811706cce99a452d Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Thu, 24 Jun 2021 17:07:43 +0300 Subject: [PATCH] cancel merges on drop partition --- src/Storages/MergeTree/MergeList.cpp | 4 ++-- src/Storages/MergeTree/MergeList.h | 16 ++++++++++++++-- .../MergeTree/MergeTreeDataMergerMutator.cpp | 8 ++++++-- src/Storages/StorageReplicatedMergeTree.cpp | 8 ++++++++ 4 files changed, 30 insertions(+), 6 deletions(-) diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index c6f9459d0db..24beb0cc06f 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -13,7 +13,7 @@ MergeListElement::MergeListElement(const StorageID & table_id_, const FutureMerg , partition_id{future_part.part_info.partition_id} , result_part_name{future_part.name} , result_part_path{future_part.path} - , result_data_version{future_part.part_info.getDataVersion()} + , result_part_info{future_part.part_info} , num_parts{future_part.parts.size()} , thread_id{getThreadId()} , merge_type{future_part.merge_type} @@ -32,7 +32,7 @@ MergeListElement::MergeListElement(const StorageID & table_id_, const FutureMerg if (!future_part.parts.empty()) { source_data_version = future_part.parts[0]->info.getDataVersion(); - is_mutation = (result_data_version != source_data_version); + is_mutation = (result_part_info.getDataVersion() != source_data_version); } /// Each merge is executed into separate background processing pool thread diff --git a/src/Storages/MergeTree/MergeList.h b/src/Storages/MergeTree/MergeList.h index 9680ce6ac30..6f4aedcc6f8 100644 --- a/src/Storages/MergeTree/MergeList.h +++ b/src/Storages/MergeTree/MergeList.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -60,7 +61,7 @@ struct MergeListElement : boost::noncopyable const std::string result_part_name; const std::string result_part_path; - Int64 result_data_version{}; + MergeTreePartInfo result_part_info; bool is_mutation{}; UInt64 num_parts{}; @@ -130,7 +131,18 @@ public: if ((partition_id.empty() || merge_element.partition_id == partition_id) && merge_element.table_id == table_id && merge_element.source_data_version < mutation_version - && merge_element.result_data_version >= mutation_version) + && merge_element.result_part_info.getDataVersion() >= mutation_version) + merge_element.is_cancelled = true; + } + } + + void cancelInPartition(const StorageID & table_id, const String & partition_id, Int64 delimiting_block_number) + { + for (auto & merge_element : entries) + { + if (merge_element.table_id == table_id + && merge_element.partition_id == partition_id + && merge_element.result_part_info.min_block < delimiting_block_number) merge_element.is_cancelled = true; } } diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 846ad7b026d..b4f3d433f66 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -951,8 +951,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor size_t rows_written = 0; const size_t initial_reservation = space_reservation ? space_reservation->getSize() : 0; - auto is_cancelled = [&]() { return merges_blocker.isCancelled() - || (need_remove_expired_values && ttl_merges_blocker.isCancelled()); }; + auto is_cancelled = [&]() + { + return merges_blocker.isCancelled() + || (need_remove_expired_values && ttl_merges_blocker.isCancelled()) + || merge_entry->is_cancelled.load(std::memory_order_relaxed); + }; Block block; while (!is_cancelled() && (block = merged_stream->read())) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 47f6bbd0ccc..75ff8a93980 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2194,6 +2194,7 @@ bool StorageReplicatedMergeTree::executeFetchShared( void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry) { auto drop_range_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version); + getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range_info.partition_id, drop_range_info.max_block); queue.removePartProducingOpsInRange(getZooKeeper(), drop_range_info, entry); if (entry.detach) @@ -2253,9 +2254,14 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) bool replace = !LogEntry::ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range); if (replace) + { + getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range.partition_id, drop_range.max_block); queue.removePartProducingOpsInRange(getZooKeeper(), drop_range, entry); + } else + { drop_range = {}; + } struct PartDescription { @@ -7094,6 +7100,8 @@ bool StorageReplicatedMergeTree::dropAllPartsInPartition( String log_znode_path = dynamic_cast(*responses.front()).path_created; entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); + getContext()->getMergeList().cancelInPartition(getStorageID(), partition_id, drop_range_info.max_block); + return true; }