From 8c8bdd507074591e33df8d1f1522996ae1324dcd Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 22 Jul 2020 18:19:54 +0300 Subject: [PATCH] Fix tests and work with multiple mutations --- .../MergeTree/MergeTreeMutationStatus.cpp | 33 +++++++++++++++++++ .../MergeTree/MergeTreeMutationStatus.h | 5 +-- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 25 +++++++++++--- .../MergeTree/ReplicatedMergeTreeQueue.h | 5 +-- src/Storages/StorageMergeTree.cpp | 28 ++++++++++++---- src/Storages/StorageMergeTree.h | 5 +-- src/Storages/StorageReplicatedMergeTree.cpp | 7 ++-- src/Storages/ya.make | 1 + .../0_stateless/00834_kill_mutation.reference | 1 + .../0_stateless/00834_kill_mutation.sh | 4 +-- ...ll_mutation_replicated_zookeeper.reference | 2 +- ...0834_kill_mutation_replicated_zookeeper.sh | 12 ++++++- 12 files changed, 104 insertions(+), 24 deletions(-) create mode 100644 src/Storages/MergeTree/MergeTreeMutationStatus.cpp diff --git a/src/Storages/MergeTree/MergeTreeMutationStatus.cpp b/src/Storages/MergeTree/MergeTreeMutationStatus.cpp new file mode 100644 index 00000000000..4a22ec9a922 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeMutationStatus.cpp @@ -0,0 +1,33 @@ +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNFINISHED; +} + +void checkMutationStatus(std::optional & status, const Strings & mutation_ids) +{ + if (!status) + { + assert(mutation_ids.size() == 1); + throw Exception(ErrorCodes::UNFINISHED, "Mutation {} was killed", mutation_ids[0]); + } + else if (!status->is_done && !status->latest_fail_reason.empty()) + { + throw Exception( + ErrorCodes::UNFINISHED, + "Exception happened during execution of mutation{} '{}' with part '{}' reason: '{}'. This error maybe retryable or not. " + "In case of unretryable error, mutation can be killed with KILL MUTATION query", + mutation_ids.size() > 1 ? "s" : "", + boost::algorithm::join(mutation_ids, ", "), + status->latest_failed_part, + status->latest_fail_reason); + } +} + +} diff --git a/src/Storages/MergeTree/MergeTreeMutationStatus.h b/src/Storages/MergeTree/MergeTreeMutationStatus.h index b70843ed49e..0e3a3f6ed4b 100644 --- a/src/Storages/MergeTree/MergeTreeMutationStatus.h +++ b/src/Storages/MergeTree/MergeTreeMutationStatus.h @@ -30,7 +30,8 @@ struct MergeTreeMutationStatus /// Check mutation status and throw exception in case of error during mutation /// (latest_fail_reason not empty) or if mutation was killed (status empty -/// optional). mutation_id passed separately, because status may be empty. -void checkMutationStatus(std::optional & status, const String & mutation_id); +/// optional). mutation_ids passed separately, because status may be empty and +/// we can execute multiple mutations at once +void checkMutationStatus(std::optional & status, const Strings & mutation_ids); } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index e45a9ace7f2..4ff63f9c2de 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1538,16 +1538,16 @@ void ReplicatedMergeTreeQueue::getInsertTimes(time_t & out_min_unprocessed_inser } -std::optional ReplicatedMergeTreeQueue::getIncompleteMutationStatus(const String & znode_name) const +std::optional ReplicatedMergeTreeQueue::getIncompleteMutationsStatus(const String & znode_name, Strings * mutation_ids) const { std::lock_guard lock(state_mutex); - auto it = mutations_by_znode.find(znode_name); + auto current_mutation_it = mutations_by_znode.find(znode_name); /// killed - if (it == mutations_by_znode.end()) + if (current_mutation_it == mutations_by_znode.end()) return {}; - const MutationStatus & status = it->second; + const MutationStatus & status = current_mutation_it->second; MergeTreeMutationStatus result { .is_done = status.is_done, @@ -1555,6 +1555,23 @@ std::optional ReplicatedMergeTreeQueue::getIncompleteMu .latest_fail_time = status.latest_fail_time, .latest_fail_reason = status.latest_fail_reason, }; + + if (mutation_ids && !status.latest_fail_reason.empty()) + { + const auto & latest_failed_part_info = status.latest_failed_part_info; + auto in_partition = mutations_by_partition.find(latest_failed_part_info.partition_id); + if (in_partition != mutations_by_partition.end()) + { + const auto & version_to_status = in_partition->second; + auto begin_it = version_to_status.upper_bound(latest_failed_part_info.getDataVersion()); + for (auto it = begin_it; it != version_to_status.end(); ++it) + { + /// All mutations with the same failure + if (!it->second->is_done && it->second->latest_fail_reason == status.latest_fail_reason) + mutation_ids->push_back(it->second->entry->znode_name); + } + } + } return result; } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 6cb0d8166c3..c330631d9dd 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -402,8 +402,9 @@ public: /// Return empty optional if mutation was killed. Otherwise return partially /// filled mutation status with information about error (latest_fail*) and - /// is_done. - std::optional getIncompleteMutationStatus(const String & znode_name) const; + /// is_done. mutation_ids filled with all mutations with same errors, because + /// they may be executed simultaneously as one mutation. + std::optional getIncompleteMutationsStatus(const String & znode_name, Strings * mutation_ids = nullptr) const; std::vector getMutationsStatus() const; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 3efd0871192..3710e8d6779 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -410,7 +410,7 @@ void StorageMergeTree::waitForMutation(Int64 version, const String & file_name) { if (shutdown_called) return true; - auto mutation_status = getIncompleteMutationStatus(version); + auto mutation_status = getIncompleteMutationsStatus(version); return !mutation_status || mutation_status->is_done || !mutation_status->latest_fail_reason.empty(); }; @@ -418,8 +418,9 @@ void StorageMergeTree::waitForMutation(Int64 version, const String & file_name) mutation_wait_event.wait(lock, check); } - auto mutation_status = getIncompleteMutationStatus(version); - checkMutationStatus(mutation_status, file_name); + Strings mutation_ids; + auto mutation_status = getIncompleteMutationsStatus(version, &mutation_ids); + checkMutationStatus(mutation_status, mutation_ids); LOG_INFO(log, "Mutation {} done", file_name); } @@ -449,29 +450,42 @@ bool comparator(const PartVersionWithName & f, const PartVersionWithName & s) } -std::optional StorageMergeTree::getIncompleteMutationStatus(Int64 mutation_version) const +std::optional StorageMergeTree::getIncompleteMutationsStatus(Int64 mutation_version, Strings * mutation_ids) const { std::lock_guard lock(currently_processing_in_background_mutex); - auto it = current_mutations_by_version.find(mutation_version); + auto current_mutation_it = current_mutations_by_version.find(mutation_version); /// Killed - if (it == current_mutations_by_version.end()) + if (current_mutation_it == current_mutations_by_version.end()) return {}; MergeTreeMutationStatus result{.is_done = false}; - const auto & mutation_entry = it->second; + const auto & mutation_entry = current_mutation_it->second; auto data_parts = getDataPartsVector(); for (const auto & data_part : data_parts) { if (data_part->info.getDataVersion() < mutation_version) { + if (!mutation_entry.latest_fail_reason.empty()) { result.latest_failed_part = mutation_entry.latest_failed_part; result.latest_fail_reason = mutation_entry.latest_fail_reason; result.latest_fail_time = mutation_entry.latest_fail_time; + + /// Fill all mutations which failed with the same error + /// (we can execute several mutations together) + if (mutation_ids) + { + auto mutations_begin_it = current_mutations_by_version.upper_bound(data_part->info.getDataVersion()); + + for (auto it = mutations_begin_it; it != current_mutations_by_version.end(); ++it) + /// All mutations with the same failure + if (it->second.latest_fail_reason == result.latest_fail_reason) + mutation_ids->push_back(it->second.file_name); + } } return result; diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index b0686f7feb2..e52792b66c3 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -156,8 +156,9 @@ private: /// Return empty optional if mutation was killed. Otherwise return partially /// filled mutation status with information about error (latest_fail*) and - /// is_done. - std::optional getIncompleteMutationStatus(Int64 mutation_version) const; + /// is_done. mutation_ids filled with mutations with the same errors, because we + /// can execute several mutations at once + std::optional getIncompleteMutationsStatus(Int64 mutation_version, Strings * mutation_ids = nullptr) const; void startBackgroundMovesIfNeeded() override; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 43f4ff2327e..4a9acdb74c0 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -386,7 +386,7 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas( if (wait_event->tryWait(1000)) break; - auto mutation_status = queue.getIncompleteMutationStatus(mutation_id); + auto mutation_status = queue.getIncompleteMutationsStatus(mutation_id); if (!mutation_status || !mutation_status->latest_fail_reason.empty()) break; } @@ -398,8 +398,9 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas( throw Exception(ErrorCodes::UNFINISHED, "Mutation {} was killed, manually removed or table was dropped", mutation_id); } - auto mutation_status = queue.getIncompleteMutationStatus(mutation_id); - checkMutationStatus(mutation_status, mutation_id); + Strings mutation_ids; + auto mutation_status = queue.getIncompleteMutationsStatus(mutation_id, &mutation_ids); + checkMutationStatus(mutation_status, mutation_ids); if (partial_shutdown_called) throw Exception("Mutation is not finished because table shutdown was called. It will be done after table restart.", diff --git a/src/Storages/ya.make b/src/Storages/ya.make index 632434e1dfc..5f5ec87ba91 100644 --- a/src/Storages/ya.make +++ b/src/Storages/ya.make @@ -87,6 +87,7 @@ SRCS( MergeTree/MergeTreeDataPartWriterOnDisk.cpp MergeTree/MergeTreeReaderInMemory.cpp MergeTree/MergeTreeWriteAheadLog.cpp + MergeTree/MergeTreeMutationStatus.cpp System/attachSystemTables.cpp System/StorageSystemAggregateFunctionCombinators.cpp System/StorageSystemAsynchronousMetrics.cpp diff --git a/tests/queries/0_stateless/00834_kill_mutation.reference b/tests/queries/0_stateless/00834_kill_mutation.reference index 1e4a67b66ea..aa0bbdcdfee 100644 --- a/tests/queries/0_stateless/00834_kill_mutation.reference +++ b/tests/queries/0_stateless/00834_kill_mutation.reference @@ -2,6 +2,7 @@ 1 waiting test kill_mutation mutation_3.txt DELETE WHERE toUInt32(s) = 1 *** Create and kill invalid mutation that blocks another mutation *** +happened during execution of mutations 'mutation_4.txt, mutation_5.txt' 1 waiting test kill_mutation mutation_4.txt DELETE WHERE toUInt32(s) = 1 2001-01-01 2 b diff --git a/tests/queries/0_stateless/00834_kill_mutation.sh b/tests/queries/0_stateless/00834_kill_mutation.sh index a00e52fc23c..8dbc75be90c 100755 --- a/tests/queries/0_stateless/00834_kill_mutation.sh +++ b/tests/queries/0_stateless/00834_kill_mutation.sh @@ -14,7 +14,7 @@ ${CLICKHOUSE_CLIENT} --query="INSERT INTO test.kill_mutation VALUES ('2001-01-01 ${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill a single invalid mutation ***'" -${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1 SETTINGS mutations_sync = 1" & +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1 SETTINGS mutations_sync = 1" 2>/dev/null & check_query1="SELECT count() FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation' AND is_done = 0" @@ -41,7 +41,7 @@ ${CLICKHOUSE_CLIENT} --query="SELECT mutation_id FROM system.mutations WHERE dat ${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill invalid mutation that blocks another mutation ***'" ${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1" -${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE x = 1 SETTINGS mutations_sync = 1" & +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE x = 1 SETTINGS mutations_sync = 1" 2>&1 | grep -o "happened during execution of mutations 'mutation_4.txt, mutation_5.txt'" | head -n 1 & check_query2="SELECT count() FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation' AND mutation_id = 'mutation_4.txt'" diff --git a/tests/queries/0_stateless/00834_kill_mutation_replicated_zookeeper.reference b/tests/queries/0_stateless/00834_kill_mutation_replicated_zookeeper.reference index d6a82e48836..aaefdaeda56 100644 --- a/tests/queries/0_stateless/00834_kill_mutation_replicated_zookeeper.reference +++ b/tests/queries/0_stateless/00834_kill_mutation_replicated_zookeeper.reference @@ -1,6 +1,6 @@ *** Create and kill a single invalid mutation *** +happened during execution of mutation '0000000000' 1 -Mutation 0000000000 was killed waiting test kill_mutation_r1 0000000000 DELETE WHERE toUInt32(s) = 1 0 *** Create and kill invalid mutation that blocks another mutation *** diff --git a/tests/queries/0_stateless/00834_kill_mutation_replicated_zookeeper.sh b/tests/queries/0_stateless/00834_kill_mutation_replicated_zookeeper.sh index 2aea2e7cfb0..4778c7f5889 100755 --- a/tests/queries/0_stateless/00834_kill_mutation_replicated_zookeeper.sh +++ b/tests/queries/0_stateless/00834_kill_mutation_replicated_zookeeper.sh @@ -18,7 +18,7 @@ ${CLICKHOUSE_CLIENT} --query="INSERT INTO test.kill_mutation_r1 VALUES ('2001-01 ${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill a single invalid mutation ***'" # wrong mutation -${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE toUInt32(s) = 1 SETTINGS mutations_sync=2" 2>&1 | grep -o "Mutation 0000000000 was killed" & +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE toUInt32(s) = 1 SETTINGS mutations_sync=2" 2>&1 | grep -o "happened during execution of mutation '0000000000'" | head -n 1 check_query1="SELECT count() FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1' AND is_done = 0" @@ -51,6 +51,16 @@ ${CLICKHOUSE_CLIENT} --query="SELECT * FROM system.replication_queue WHERE table ${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE toUInt32(s) = 1" +check_query1="SELECT count() FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1' AND is_done = 0" + +query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1` + +while [ "$query_result" == "0" ] +do + query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1` + sleep 0.5 +done + # good mutation, but blocked with wrong mutation ${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE x = 1 SETTINGS mutations_sync=2" &