diff --git a/src/Storages/MergeTree/MergeTreeMutationStatus.h b/src/Storages/MergeTree/MergeTreeMutationStatus.h index 3a9ecf30eb1..b70843ed49e 100644 --- a/src/Storages/MergeTree/MergeTreeMutationStatus.h +++ b/src/Storages/MergeTree/MergeTreeMutationStatus.h @@ -1,12 +1,15 @@ #pragma once #include +#include +#include #include namespace DB { + struct MergeTreeMutationStatus { String id; @@ -25,4 +28,9 @@ struct MergeTreeMutationStatus String latest_fail_reason; }; +/// Check mutation status and throw exception in case of error during mutation +/// (latest_fail_reason not empty) or if mutation was killed (status empty +/// optional). mutation_id passed separately, because status may be empty. +void checkMutationStatus(std::optional & status, const String & mutation_id); + } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 3deb61bf8db..e45a9ace7f2 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1538,6 +1538,26 @@ void ReplicatedMergeTreeQueue::getInsertTimes(time_t & out_min_unprocessed_inser } +std::optional ReplicatedMergeTreeQueue::getIncompleteMutationStatus(const String & znode_name) const +{ + + std::lock_guard lock(state_mutex); + auto it = mutations_by_znode.find(znode_name); + /// killed + if (it == mutations_by_znode.end()) + return {}; + + const MutationStatus & status = it->second; + MergeTreeMutationStatus result + { + .is_done = status.is_done, + .latest_failed_part = status.latest_failed_part, + .latest_fail_time = status.latest_fail_time, + .latest_fail_reason = status.latest_fail_reason, + }; + return result; +} + std::vector ReplicatedMergeTreeQueue::getMutationsStatus() const { std::lock_guard lock(state_mutex); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index cd155214cac..6cb0d8166c3 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -399,6 +399,12 @@ public: /// Get information about the insertion times. void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const; + + /// Return empty optional if mutation was killed. Otherwise return partially + /// filled mutation status with information about error (latest_fail*) and + /// is_done. + std::optional getIncompleteMutationStatus(const String & znode_name) const; + std::vector getMutationsStatus() const; void removeCurrentPartsFromMutations(); diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index eecde28b7bb..fb76160c687 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -40,6 +40,7 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; extern const int INCORRECT_DATA; extern const int CANNOT_ASSIGN_OPTIMIZE; + extern const int UNFINISHED; } namespace ActionLocks @@ -134,7 +135,7 @@ void StorageMergeTree::shutdown() /// Unlock all waiting mutations { - std::lock_guard lock(mutation_wait_mutex); + std::lock_guard lock(mutation_wait_mutex); mutation_wait_event.notify_all(); } @@ -370,6 +371,10 @@ public: entry.latest_failed_part_info = future_part.parts.at(0)->info; entry.latest_fail_time = time(nullptr); entry.latest_fail_reason = exception_message; + { + std::lock_guard lock_mutation_wait(storage.mutation_wait_mutex); + storage.mutation_wait_event.notify_all(); + } } } @@ -401,9 +406,22 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, String void StorageMergeTree::waitForMutation(Int64 version, const String & file_name) { LOG_INFO(log, "Waiting mutation: {}", file_name); - auto check = [version, this]() { return shutdown_called || isMutationDone(version); }; - std::unique_lock lock(mutation_wait_mutex); - mutation_wait_event.wait(lock, check); + { + auto check = [version, this]() + { + if (shutdown_called) + return true; + auto mutation_status = getIncompleteMutationStatus(version); + return !mutation_status || mutation_status->is_done || !mutation_status->latest_fail_reason.empty(); + }; + + std::unique_lock lock(mutation_wait_mutex); + mutation_wait_event.wait(lock, check); + } + + auto mutation_status = getIncompleteMutationStatus(version); + checkMutationStatus(mutation_status, file_name); + LOG_INFO(log, "Mutation {} done", file_name); } @@ -432,20 +450,37 @@ bool comparator(const PartVersionWithName & f, const PartVersionWithName & s) } - -bool StorageMergeTree::isMutationDone(Int64 mutation_version) const +std::optional StorageMergeTree::getIncompleteMutationStatus(Int64 mutation_version) const { std::lock_guard lock(currently_processing_in_background_mutex); + auto it = current_mutations_by_version.find(mutation_version); /// Killed - if (!current_mutations_by_version.count(mutation_version)) - return true; + if (it == current_mutations_by_version.end()) + return {}; + + MergeTreeMutationStatus result{.is_done = false}; + + const auto & mutation_entry = it->second; auto data_parts = getDataPartsVector(); for (const auto & data_part : data_parts) + { if (data_part->info.getDataVersion() < mutation_version) - return false; - return true; + { + if (!mutation_entry.latest_fail_reason.empty()) + { + result.latest_failed_part = mutation_entry.latest_failed_part; + result.latest_fail_reason = mutation_entry.latest_fail_reason; + result.latest_fail_time = mutation_entry.latest_fail_time; + } + + return result; + } + } + + result.is_done = true; + return result; } @@ -474,7 +509,6 @@ std::vector StorageMergeTree::getMutationsStatus() cons std::vector result; for (const auto & kv : current_mutations_by_version) { - Int64 mutation_version = kv.first; const MergeTreeMutationEntry & entry = kv.second; const PartVersionWithName needle{mutation_version, ""}; @@ -500,7 +534,7 @@ std::vector StorageMergeTree::getMutationsStatus() cons entry.create_time, block_numbers_map, parts_to_do_names, - parts_to_do_names.empty(), + /* is_done = */parts_to_do_names.empty(), entry.latest_failed_part, entry.latest_fail_time, entry.latest_fail_reason, @@ -833,7 +867,7 @@ bool StorageMergeTree::tryMutatePart() /// Notify all, who wait for this or previous mutations { - std::lock_guard lock(mutation_wait_mutex); + std::lock_guard lock(mutation_wait_mutex); mutation_wait_event.notify_all(); } } diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index c80c9f44377..b0686f7feb2 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -154,8 +154,10 @@ private: void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context); bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; - /// Just checks versions of each active data part - bool isMutationDone(Int64 mutation_version) const; + /// Return empty optional if mutation was killed. Otherwise return partially + /// filled mutation status with information about error (latest_fail*) and + /// is_done. + std::optional getIncompleteMutationStatus(Int64 mutation_version) const; void startBackgroundMovesIfNeeded() override; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index c6dc3e67b80..43f4ff2327e 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -348,7 +348,6 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas( std::set inactive_replicas; for (const String & replica : replicas) { - LOG_DEBUG(log, "Waiting for {} to apply mutation {}", replica, mutation_id); while (!partial_shutdown_called) @@ -358,8 +357,7 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas( Coordination::Stat exists_stat; if (!getZooKeeper()->exists(zookeeper_path + "/mutations/" + mutation_id, &exists_stat, wait_event)) { - LOG_WARNING(log, "Mutation {} was killed or manually removed. Nothing to wait.", mutation_id); - return; + throw Exception(ErrorCodes::UNFINISHED, "Mutation {} was killed, manually removed or table was dropped", mutation_id); } auto zookeeper = getZooKeeper(); @@ -387,8 +385,22 @@ void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas( /// Replica can become inactive, so wait with timeout and recheck it if (wait_event->tryWait(1000)) break; + + auto mutation_status = queue.getIncompleteMutationStatus(mutation_id); + if (!mutation_status || !mutation_status->latest_fail_reason.empty()) + break; } + /// It maybe already removed from zk, but local in-memory mutations + /// state was not update. + if (!getZooKeeper()->exists(zookeeper_path + "/mutations/" + mutation_id)) + { + throw Exception(ErrorCodes::UNFINISHED, "Mutation {} was killed, manually removed or table was dropped", mutation_id); + } + + auto mutation_status = queue.getIncompleteMutationStatus(mutation_id); + checkMutationStatus(mutation_status, mutation_id); + if (partial_shutdown_called) throw Exception("Mutation is not finished because table shutdown was called. It will be done after table restart.", ErrorCodes::UNFINISHED); diff --git a/tests/queries/0_stateless/01414_mutations_and_errors.reference b/tests/queries/0_stateless/01414_mutations_and_errors.reference new file mode 100644 index 00000000000..166a9c6b7b8 --- /dev/null +++ b/tests/queries/0_stateless/01414_mutations_and_errors.reference @@ -0,0 +1,4 @@ +42 +Hello +42 +Hello diff --git a/tests/queries/0_stateless/01414_mutations_and_errors.sql b/tests/queries/0_stateless/01414_mutations_and_errors.sql new file mode 100644 index 00000000000..af7eeb8b9ee --- /dev/null +++ b/tests/queries/0_stateless/01414_mutations_and_errors.sql @@ -0,0 +1,29 @@ +DROP TABLE IF EXISTS mutation_table; + +CREATE TABLE mutation_table +( + date Date, + key UInt64, + value String +) +ENGINE = MergeTree() +PARTITION BY date +ORDER BY tuple(); + +INSERT INTO mutation_table SELECT toDate('2019-10-01'), number, '42' FROM numbers(100); + +INSERT INTO mutation_table SELECT toDate('2019-10-02'), number, 'Hello' FROM numbers(100); + +SELECT distinct(value) FROM mutation_table ORDER BY value; + +ALTER TABLE mutation_table MODIFY COLUMN value UInt64 SETTINGS mutations_sync = 2; --{serverError 341} + +SELECT distinct(value) FROM mutation_table ORDER BY value; --{serverError 6} + +KILL MUTATION where table = 'mutation_table' and database = currentDatabase(); + +ALTER TABLE mutation_table MODIFY COLUMN value String SETTINGS mutations_sync = 2; + +SELECT distinct(value) FROM mutation_table ORDER BY value; + +DROP TABLE IF EXISTS mutation_table; diff --git a/tests/queries/0_stateless/01414_mutations_and_errors_zookeeper.reference b/tests/queries/0_stateless/01414_mutations_and_errors_zookeeper.reference new file mode 100644 index 00000000000..a55134cbe31 --- /dev/null +++ b/tests/queries/0_stateless/01414_mutations_and_errors_zookeeper.reference @@ -0,0 +1,5 @@ +Mutation 0000000000 was killed +Cannot parse string 'Hello' as UInt64 +Cannot parse string 'Hello' as UInt64 +42 +Hello diff --git a/tests/queries/0_stateless/01414_mutations_and_errors_zookeeper.sh b/tests/queries/0_stateless/01414_mutations_and_errors_zookeeper.sh new file mode 100755 index 00000000000..9881b1f7def --- /dev/null +++ b/tests/queries/0_stateless/01414_mutations_and_errors_zookeeper.sh @@ -0,0 +1,73 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS replicated_mutation_table" + +$CLICKHOUSE_CLIENT --query " + CREATE TABLE replicated_mutation_table( + date Date, + key UInt64, + value String + ) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/mutation_table', '1') + ORDER BY tuple() + PARTITION BY date +" + +$CLICKHOUSE_CLIENT --query "INSERT INTO replicated_mutation_table SELECT toDate('2019-10-02'), number, '42' FROM numbers(4)" + +$CLICKHOUSE_CLIENT --query "INSERT INTO replicated_mutation_table SELECT toDate('2019-10-02'), number, 'Hello' FROM numbers(4)" + +$CLICKHOUSE_CLIENT --query "ALTER TABLE replicated_mutation_table UPDATE key = key + 1 WHERE sleepEachRow(1) == 0 SETTINGS mutations_sync = 2" 2>&1 | grep -o 'Mutation 0000000000 was killed' | head -n 1 & + +check_query="SELECT count() FROM system.mutations WHERE table='replicated_mutation_table' and database='$CLICKHOUSE_DATABASE' and mutation_id='0000000000'" + +query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1` + +while [ "$query_result" != "1" ] +do + query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1` + sleep 0.5 +done + +$CLICKHOUSE_CLIENT --query "KILL MUTATION WHERE table='replicated_mutation_table' and database='$CLICKHOUSE_DATABASE' and mutation_id='0000000000'" &> /dev/null + +while [ "$query_result" != "0" ] +do + query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1` + sleep 0.5 +done + +wait + +$CLICKHOUSE_CLIENT --query "ALTER TABLE replicated_mutation_table MODIFY COLUMN value UInt64 SETTINGS replication_alter_partitions_sync = 2" 2>&1 | grep -o "Cannot parse string 'Hello' as UInt64" | head -n 1 & + +check_query="SELECT count() FROM system.mutations WHERE table='replicated_mutation_table' and database='$CLICKHOUSE_DATABASE' and mutation_id='0000000001'" + +query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1` + +while [ "$query_result" != "1" ] +do + query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1` + sleep 0.5 +done + +wait + +$CLICKHOUSE_CLIENT --query "KILL MUTATION WHERE table='replicated_mutation_table' and database='$CLICKHOUSE_DATABASE' AND mutation_id='0000000001'" &> /dev/null + +while [ "$query_result" != "0" ] +do + query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1` + sleep 0.5 +done + +$CLICKHOUSE_CLIENT --query "SELECT distinct(value) FROM replicated_mutation_table ORDER BY value" 2>&1 | grep -o "Cannot parse string 'Hello' as UInt64" | head -n 1 + +$CLICKHOUSE_CLIENT --query "ALTER TABLE replicated_mutation_table MODIFY COLUMN value String SETTINGS replication_alter_partitions_sync = 2" + +$CLICKHOUSE_CLIENT --query "SELECT distinct(value) FROM replicated_mutation_table ORDER BY value" + +$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS replicated_mutation_table"