From ca93fd483a24210df85daee68084cf5f21831189 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 14 Jan 2019 15:25:25 +0300 Subject: [PATCH] kill ordinary MergeTree mutations [#CLICKHOUSE-3912] --- dbms/src/Storages/MergeTree/MergeList.cpp | 10 ++++--- dbms/src/Storages/MergeTree/MergeList.h | 26 ++++++++++++++--- .../MergeTree/MergeTreeDataMergerMutator.cpp | 2 +- dbms/src/Storages/StorageMergeTree.cpp | 28 +++++++++++++++++-- dbms/src/Storages/StorageMergeTree.h | 3 +- 5 files changed, 56 insertions(+), 13 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeList.cpp b/dbms/src/Storages/MergeTree/MergeList.cpp index 600aaac2296..b65d53d032d 100644 --- a/dbms/src/Storages/MergeTree/MergeList.cpp +++ b/dbms/src/Storages/MergeTree/MergeList.cpp @@ -15,8 +15,10 @@ namespace DB { MergeListElement::MergeListElement(const std::string & database, const std::string & table, const FutureMergedMutatedPart & future_part) - : database{database}, table{table} - , result_part_name{future_part.name}, num_parts{future_part.parts.size()} + : database{database}, table{table}, partition_id{future_part.part_info.partition_id} + , result_part_name{future_part.name} + , result_data_version{future_part.part_info.getDataVersion()} + , num_parts{future_part.parts.size()} , thread_number{Poco::ThreadNumber::get()} { for (const auto & source_part : future_part.parts) @@ -31,8 +33,8 @@ MergeListElement::MergeListElement(const std::string & database, const std::stri if (!future_part.parts.empty()) { - partition_id = future_part.parts[0]->info.partition_id; - is_mutation = future_part.parts[0]->info.getDataVersion() != future_part.part_info.getDataVersion(); + source_data_version = future_part.parts[0]->info.getDataVersion(); + is_mutation = (result_data_version != source_data_version); } /// Each merge is executed into separate background processing pool thread diff --git a/dbms/src/Storages/MergeTree/MergeList.h b/dbms/src/Storages/MergeTree/MergeList.h index 6c8e20332cd..bb41998b391 100644 --- a/dbms/src/Storages/MergeTree/MergeList.h +++ b/dbms/src/Storages/MergeTree/MergeList.h @@ -51,13 +51,20 @@ struct MergeListElement : boost::noncopyable { const std::string database; const std::string table; - const std::string result_part_name; std::string partition_id; - bool is_mutation = false; - Stopwatch watch; - std::atomic progress{}; + + const std::string result_part_name; + Int64 result_data_version{}; + bool is_mutation{}; + UInt64 num_parts{}; Names source_part_names; + Int64 source_data_version{}; + + Stopwatch watch; + std::atomic progress{}; + std::atomic is_cancelled{}; + UInt64 total_size_bytes_compressed{}; UInt64 total_size_marks{}; std::atomic bytes_read_uncompressed{}; @@ -138,6 +145,17 @@ public: res.emplace_back(merge_element.getInfo()); return res; } + + void cancelMutation(Int64 mutation_version) + { + std::lock_guard lock{mutex}; + for (auto & merge_element : merges) + { + if (merge_element.source_data_version < mutation_version + && merge_element.result_data_version >= mutation_version) + merge_element.is_cancelled = true; + } + } }; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 2291f69630c..7eefeb64616 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -813,7 +813,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor { auto check_not_cancelled = [&]() { - if (actions_blocker.isCancelled()) + if (actions_blocker.isCancelled() || merge_entry->is_cancelled) throw Exception("Cancelled mutating parts", ErrorCodes::ABORTED); return true; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 4a0fbca8196..bd8fd4e223f 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -336,7 +336,8 @@ void StorageMergeTree::mutate(const MutationCommands & commands, const Context & Int64 version = increment.get(); entry.commit(version); file_name = entry.file_name; - current_mutations_by_version.emplace(version, std::move(entry)); + auto insertion = current_mutations_by_id.emplace(file_name, std::move(entry)); + current_mutations_by_version.emplace(version, insertion.first->second); } LOG_INFO(log, "Added mutation: " << file_name); @@ -389,7 +390,26 @@ std::vector StorageMergeTree::getMutationsStatus() cons void StorageMergeTree::killMutation(const String & mutation_id) { - LOG_TRACE(log, "KILL MUTATION " << mutation_id); + LOG_TRACE(log, "Killing mutation " << mutation_id); + + std::optional to_kill; + { + std::lock_guard lock(currently_merging_mutex); + auto it = current_mutations_by_id.find(mutation_id); + if (it != current_mutations_by_id.end()) + { + to_kill.emplace(std::move(it->second)); + current_mutations_by_id.erase(it); + current_mutations_by_version.erase(to_kill->block_number); + } + } + + if (to_kill) + { + global_context.getMergeList().cancelMutation(to_kill->block_number); + to_kill->removeFile(); + LOG_TRACE(log, "Cancelled part mutations and removed mutation file " << mutation_id); + } } @@ -402,7 +422,8 @@ void StorageMergeTree::loadMutations() { MergeTreeMutationEntry entry(full_path, it.name()); Int64 block_number = entry.block_number; - current_mutations_by_version.emplace(block_number, std::move(entry)); + auto insertion = current_mutations_by_id.emplace(it.name(), std::move(entry)); + current_mutations_by_version.emplace(block_number, insertion.first->second); } else if (startsWith(it.name(), "tmp_mutation_")) { @@ -720,6 +741,7 @@ void StorageMergeTree::clearOldMutations() for (size_t i = 0; i < to_delete_count; ++i) { mutations_to_delete.push_back(std::move(it->second)); + current_mutations_by_id.erase(mutations_to_delete.back().file_name); it = current_mutations_by_version.erase(it); } } diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index d02d73782db..302e9ea6c11 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -117,7 +117,8 @@ private: mutable std::mutex currently_merging_mutex; MergeTreeData::DataParts currently_merging; - std::multimap current_mutations_by_version; + std::map current_mutations_by_id; + std::multimap current_mutations_by_version; Logger * log;