From 355150afce926df5e3dc6cd0c93386a687a8acd4 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 22 Jul 2020 22:29:54 +0300 Subject: [PATCH] Fix deadlock and better code --- .../MergeTree/MergeTreeMutationStatus.cpp | 1 + src/Storages/StorageMergeTree.cpp | 93 +++++++++---------- src/Storages/StorageMergeTree.h | 4 + 3 files changed, 47 insertions(+), 51 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeMutationStatus.cpp b/src/Storages/MergeTree/MergeTreeMutationStatus.cpp index 4a22ec9a922..4819cf9b2a9 100644 --- a/src/Storages/MergeTree/MergeTreeMutationStatus.cpp +++ b/src/Storages/MergeTree/MergeTreeMutationStatus.cpp @@ -1,4 +1,5 @@ #include + #include #include diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 3710e8d6779..8f58fd0ba4d 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -284,9 +284,6 @@ struct CurrentlyMergingPartsTagger FutureMergedMutatedPart future_part; ReservationPtr reserved_space; - bool is_successful = false; - String exception_message; - StorageMergeTree & storage; public: @@ -339,44 +336,6 @@ public: storage.currently_merging_mutating_parts.erase(part); } - /// Update the information about failed parts in the system.mutations table. - - Int64 sources_data_version = future_part.parts.at(0)->info.getDataVersion(); - Int64 result_data_version = future_part.part_info.getDataVersion(); - auto mutations_begin_it = storage.current_mutations_by_version.end(); - auto mutations_end_it = storage.current_mutations_by_version.end(); - if (sources_data_version != result_data_version) - { - mutations_begin_it = storage.current_mutations_by_version.upper_bound(sources_data_version); - mutations_end_it = storage.current_mutations_by_version.upper_bound(result_data_version); - } - - for (auto it = mutations_begin_it; it != mutations_end_it; ++it) - { - MergeTreeMutationEntry & entry = it->second; - if (is_successful) - { - if (!entry.latest_failed_part.empty() && future_part.part_info.contains(entry.latest_failed_part_info)) - { - entry.latest_failed_part.clear(); - entry.latest_failed_part_info = MergeTreePartInfo(); - entry.latest_fail_time = 0; - entry.latest_fail_reason.clear(); - } - } - else - { - entry.latest_failed_part = future_part.parts.at(0)->name; - entry.latest_failed_part_info = future_part.parts.at(0)->info; - entry.latest_fail_time = time(nullptr); - entry.latest_fail_reason = exception_message; - { - std::lock_guard lock_mutation_wait(storage.mutation_wait_mutex); - storage.mutation_wait_event.notify_all(); - } - } - } - storage.currently_processing_in_background_condition.notify_all(); } }; @@ -402,6 +361,46 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, String return version; } + +void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPart result_part, bool is_successful, const String & exception_message) +{ + /// Update the information about failed parts in the system.mutations table. + + Int64 sources_data_version = result_part.parts.at(0)->info.getDataVersion(); + Int64 result_data_version = result_part.part_info.getDataVersion(); + if (sources_data_version != result_data_version) + { + std::lock_guard lock(currently_processing_in_background_mutex); + auto mutations_begin_it = current_mutations_by_version.upper_bound(sources_data_version); + auto mutations_end_it = current_mutations_by_version.upper_bound(result_data_version); + + for (auto it = mutations_begin_it; it != mutations_end_it; ++it) + { + MergeTreeMutationEntry & entry = it->second; + if (is_successful) + { + if (!entry.latest_failed_part.empty() && result_part.part_info.contains(entry.latest_failed_part_info)) + { + entry.latest_failed_part.clear(); + entry.latest_failed_part_info = MergeTreePartInfo(); + entry.latest_fail_time = 0; + entry.latest_fail_reason.clear(); + } + } + else + { + entry.latest_failed_part = result_part.parts.at(0)->name; + entry.latest_failed_part_info = result_part.parts.at(0)->info; + entry.latest_fail_time = time(nullptr); + entry.latest_fail_reason = exception_message; + } + } + } + + std::unique_lock lock(mutation_wait_mutex); + mutation_wait_event.notify_all(); +} + void StorageMergeTree::waitForMutation(Int64 version, const String & file_name) { LOG_INFO(log, "Waiting mutation: {}", file_name); @@ -733,12 +732,10 @@ bool StorageMergeTree::merge( merging_tagger->reserved_space, deduplicate, force_ttl); merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr); - merging_tagger->is_successful = true; write_part_log({}); } catch (...) { - merging_tagger->exception_message = getCurrentExceptionMessage(false); write_part_log(ExecutionStatus::fromCurrentException()); throw; } @@ -875,18 +872,12 @@ bool StorageMergeTree::tryMutatePart() renameTempPartAndReplace(new_part); - tagger->is_successful = true; + updateMutationEntriesErrors(future_part, true, ""); write_part_log({}); - - /// Notify all, who wait for this or previous mutations - { - std::lock_guard lock(mutation_wait_mutex); - mutation_wait_event.notify_all(); - } } catch (...) { - tagger->exception_message = getCurrentExceptionMessage(false); + updateMutationEntriesErrors(future_part, false, getCurrentExceptionMessage(false)); write_part_log(ExecutionStatus::fromCurrentException()); throw; } diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index e52792b66c3..4bc7a1fbd98 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -153,6 +153,10 @@ private: void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context); void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context); bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; + /// Update mutation entries after part mutation execution. May reset old + /// errors if mutation was successful. Otherwise update last_failed* fields + /// in mutation entries. + void updateMutationEntriesErrors(FutureMergedMutatedPart result_part, bool is_successful, const String & exception_message); /// Return empty optional if mutation was killed. Otherwise return partially /// filled mutation status with information about error (latest_fail*) and