From 9fea941009e88f59124c14cebfa532dc277989d5 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 19 Dec 2019 18:27:56 +0300 Subject: [PATCH] Make wait endless --- dbms/src/Core/Settings.h | 2 +- .../MergeTree/MergeTreeMutationEntry.h | 1 + dbms/src/Storages/StorageMergeTree.cpp | 16 ++-- .../Storages/StorageReplicatedMergeTree.cpp | 94 +++++++++---------- .../src/Storages/StorageReplicatedMergeTree.h | 11 +-- .../0_stateless/00834_kill_mutation.sh | 8 +- ...ll_mutation_replicated_zookeeper.reference | 1 + ...0834_kill_mutation_replicated_zookeeper.sh | 33 ++++++- ..._zookeeper_synchronous_mutations.reference | 2 - .../01049_zookeeper_synchronous_mutations.sql | 13 +-- 10 files changed, 95 insertions(+), 86 deletions(-) diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index c4ac2925feb..e64440961a4 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -388,7 +388,7 @@ struct Settings : public SettingsCollection \ M(SettingBool, enable_scalar_subquery_optimization, true, "If it is set to true, prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once.", 0) \ M(SettingBool, optimize_trivial_count_query, true, "Process trivial 'SELECT count() FROM table' query from metadata.", 0) \ - M(SettingUInt64, mutation_synchronous_wait_timeout, 0, "Seconds to wait for synchronous execution of ALTER TABLE UPDATE/DELETE queries (mutations). After execute asynchronously. 0 - execute asynchronously from the start.", 0) \ + M(SettingUInt64, mutations_sync, 0, "Wait for synchronous execution of ALTER TABLE UPDATE/DELETE queries (mutations). 0 - execute asynchronously. 1 - wait current server. 2 - wait all replicas if they exist.", 0) \ \ /** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \ \ diff --git a/dbms/src/Storages/MergeTree/MergeTreeMutationEntry.h b/dbms/src/Storages/MergeTree/MergeTreeMutationEntry.h index 2b3bde72552..f5c512617d3 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeMutationEntry.h +++ b/dbms/src/Storages/MergeTree/MergeTreeMutationEntry.h @@ -25,6 +25,7 @@ struct MergeTreeMutationEntry MergeTreePartInfo latest_failed_part_info; time_t latest_fail_time = 0; String latest_fail_reason; + int latest_fail_error_code = 0; /// Create a new entry and write it to a temporary file. MergeTreeMutationEntry(MutationCommands commands_, const String & path_prefix_, Int64 tmp_number); diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index f93a99bf89b..1ae67e8a5a8 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -447,15 +447,14 @@ void StorageMergeTree::mutate(const MutationCommands & commands, const Context & LOG_INFO(log, "Added mutation: " << file_name); merging_mutating_task_handle->wake(); - size_t timeout = query_context.getSettingsRef().mutation_synchronous_wait_timeout; - /// If timeout is set, than we can wait - if (timeout != 0) + /// We have to wait mutation end + if (query_context.getSettingsRef().mutations_sync > 0) { - LOG_INFO(log, "Waiting mutation: " << file_name << " for " << timeout << " seconds"); + LOG_INFO(log, "Waiting mutation: " << file_name); auto check = [version, this]() { return isMutationDone(version); }; std::unique_lock lock(mutation_wait_mutex); - if (!mutation_wait_event.wait_for(lock, std::chrono::seconds{timeout}, check)) - throw Exception("Mutation " + file_name + " is not finished. Will be done asynchronously", ErrorCodes::UNFINISHED); + mutation_wait_event.wait(lock, check); + } } @@ -479,6 +478,10 @@ bool StorageMergeTree::isMutationDone(Int64 mutation_version) const { std::lock_guard lock(currently_processing_in_background_mutex); + /// Killed + if (!current_mutations_by_version.count(mutation_version)) + return true; + auto data_parts = getDataPartsVector(); for (const auto & data_part : data_parts) if (data_part->info.getDataVersion() < mutation_version) @@ -559,6 +562,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id) global_context.getMergeList().cancelPartMutations({}, to_kill->block_number); to_kill->removeFile(); LOG_TRACE(log, "Cancelled part mutations and removed mutation file " << mutation_id); + mutation_wait_event.notify_all(); /// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately. merging_mutating_task_handle->wake(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index be8124b05ca..1bdd506dee9 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -310,87 +310,73 @@ bool StorageReplicatedMergeTree::checkFixedGranualrityInZookeeper() } -void StorageReplicatedMergeTree::waitForAllReplicasToStatisfyNodeCondition( - size_t timeout, const String & name_for_logging, - const String & replica_relative_node_path, CheckNodeCallback callback) const +void StorageReplicatedMergeTree::waitMutationToFinishOnReplicas( + const Strings & replicas, const String & mutation_id) const { - const auto operation_start = std::chrono::system_clock::now(); - std::chrono::milliseconds total_time{timeout * 1000}; + if (replicas.empty()) + return; + zkutil::EventPtr wait_event = std::make_shared(); - Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas"); + + std::set inactive_replicas; - std::set timed_out_replicas; for (const String & replica : replicas) { - LOG_DEBUG(log, "Waiting for " << replica << " to apply " + name_for_logging); - bool operation_is_processed_by_relica = false; + LOG_DEBUG(log, "Waiting for " << replica << " to apply mutation " + mutation_id); + while (!partial_shutdown_called) { + /// Mutation maybe killed or whole replica was deleted. + /// Wait event will unblock at this moment. + Coordination::Stat exists_stat; + if (!getZooKeeper()->exists(zookeeper_path + "/mutations/" + mutation_id, &exists_stat, wait_event)) + { + LOG_WARNING(log, "Mutation " << mutation_id << " was killed or manually removed. Nothing to wait."); + return; + } + auto zookeeper = getZooKeeper(); /// Replica could be inactive. if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")) { - LOG_WARNING(log, "Replica " << replica << " is not active during mutation query." - << name_for_logging << " will be done asynchronously when replica becomes active."); + LOG_WARNING(log, "Replica " << replica << " is not active during mutation. " + "Mutation will be done asynchronously when replica becomes active."); inactive_replicas.emplace(replica); break; } - String node_for_check = zookeeper_path + "/replicas/" + replica + "/" + replica_relative_node_path; - std::string node_for_check_value; - Coordination::Stat stat; + String mutation_pointer = zookeeper_path + "/replicas/" + replica + "/mutation_pointer"; + std::string mutation_pointer_value; + Coordination::Stat get_stat; /// Replica could be removed - if (!zookeeper->tryGet(node_for_check, node_for_check_value, &stat, wait_event)) + if (!zookeeper->tryGet(mutation_pointer, mutation_pointer_value, &get_stat, wait_event)) { LOG_WARNING(log, replica << " was removed"); - operation_is_processed_by_relica = true; break; } - else /// in other case check required node - { - if (callback(node_for_check_value)) - { - operation_is_processed_by_relica = true; - break; /// operation is done - } - } + else if (mutation_pointer_value >= mutation_id) /// Maybe we already processed more fresh mutation + break; /// (numbers like 0000000000 and 0000000001) - std::chrono::milliseconds time_spent = - std::chrono::duration_cast(std::chrono::system_clock::now() - operation_start); - std::chrono::milliseconds time_left = total_time - time_spent; - - /// We have some time to wait - if (time_left.count() > 0) - wait_event->tryWait(time_left.count()); - else /// Otherwise time is up - break; + /// We wait without timeout. + wait_event->wait(); } if (partial_shutdown_called) - throw Exception(name_for_logging + " is not finished because table shutdown was called. " + name_for_logging + " will be done after table restart.", + throw Exception("Mutation is not finished because table shutdown was called. It will be done after table restart.", ErrorCodes::UNFINISHED); - - if (!operation_is_processed_by_relica && !inactive_replicas.count(replica)) - timed_out_replicas.emplace(replica); } - if (!inactive_replicas.empty() || !timed_out_replicas.empty()) + if (!inactive_replicas.empty()) { std::stringstream exception_message; - exception_message << name_for_logging << " is not finished because"; + exception_message << "Mutation is not finished because"; if (!inactive_replicas.empty()) exception_message << " some replicas are inactive right now: " << boost::algorithm::join(inactive_replicas, ", "); - if (!timed_out_replicas.empty() && !inactive_replicas.empty()) - exception_message << " and"; - - if (!timed_out_replicas.empty()) - exception_message << " timeout when waiting for some replicas: " << boost::algorithm::join(timed_out_replicas, ", "); - - exception_message << ". " << name_for_logging << " will be done asynchronously"; + exception_message << ". Mutation will be done asynchronously"; throw Exception(exception_message.str(), ErrorCodes::UNFINISHED); } @@ -3382,7 +3368,7 @@ void StorageReplicatedMergeTree::alter( time_t replication_alter_columns_timeout = query_context.getSettingsRef().replication_alter_columns_timeout; - /// This code is quite similar with waitForAllReplicasToStatisfyNodeCondition + /// This code is quite similar with waitMutationToFinishOnReplicas /// but contains more complicated details (versions manipulations, multiple nodes, etc.). /// It will be removed soon in favor of alter-modify implementation on top of mutations. /// TODO (alesap) @@ -4603,17 +4589,21 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const throw Coordination::Exception("Unable to create a mutation znode", rc); } - if (query_context.getSettingsRef().mutation_synchronous_wait_timeout != 0) /// some timeout specified + /// we have to wait + if (query_context.getSettingsRef().mutations_sync != 0) { auto check_callback = [mutation_number = entry.znode_name](const String & zk_value) { - /// Maybe we already processed more fresh mutation - /// We can compare their znode names (numbers like 0000000000 and 0000000001). return zk_value >= mutation_number; }; - waitForAllReplicasToStatisfyNodeCondition( - query_context.getSettingsRef().mutation_synchronous_wait_timeout, "Mutation", "mutation_pointer", check_callback); + Strings replicas; + if (query_context.getSettingsRef().mutations_sync == 2) /// wait for all replicas + replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas"); + else if (query_context.getSettingsRef().mutations_sync == 1) /// just wait for ourself + replicas.push_back(replica_path); + + waitMutationToFinishOnReplicas(replicas, entry.znode_name); } } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 55957439a1c..90b18a07eec 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -532,14 +532,9 @@ private: /// return true if it's fixed bool checkFixedGranualrityInZookeeper(); - using CheckNodeCallback = std::function; - - /// Wait for timeout seconds when condition became true for node - /// /replicas/{replica}/replica_replative_node_path value for all replicas. - /// operation_name_for_logging used for logging about errors. - void waitForAllReplicasToStatisfyNodeCondition( - size_t timeout, const String & operaton_name_for_logging, - const String & replica_relative_node_path, CheckNodeCallback condition) const; + /// Wait for timeout seconds mutation is finished on replicas + void waitMutationToFinishOnReplicas( + const Strings & replicas, const String & mutation_id) const; protected: /** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table. diff --git a/dbms/tests/queries/0_stateless/00834_kill_mutation.sh b/dbms/tests/queries/0_stateless/00834_kill_mutation.sh index 03369dbff7a..726764c654b 100755 --- a/dbms/tests/queries/0_stateless/00834_kill_mutation.sh +++ b/dbms/tests/queries/0_stateless/00834_kill_mutation.sh @@ -14,27 +14,29 @@ ${CLICKHOUSE_CLIENT} --query="INSERT INTO test.kill_mutation VALUES ('2001-01-01 ${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill a single invalid mutation ***'" -${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1" +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1 SETTINGS mutations_sync = 1" & sleep 0.1 ${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, latest_failed_part IN ('20000101_1_1_0', '20010101_2_2_0'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation'" ${CLICKHOUSE_CLIENT} --query="KILL MUTATION WHERE database = 'test' AND table = 'kill_mutation'" +wait + ${CLICKHOUSE_CLIENT} --query="SELECT mutation_id FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation'" ${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill invalid mutation that blocks another mutation ***'" ${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE toUInt32(s) = 1" -${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE x = 1" +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation DELETE WHERE x = 1 SETTINGS mutations_sync = 1" & ${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, latest_failed_part IN ('20000101_1_1_0', '20010101_2_2_0'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation' AND mutation_id = 'mutation_4.txt'" sleep 0.1 ${CLICKHOUSE_CLIENT} --query="KILL MUTATION WHERE database = 'test' AND table = 'kill_mutation' AND mutation_id = 'mutation_4.txt'" -wait_for_mutation "kill_mutation" "mutation_5.txt" "test" +wait ${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.kill_mutation" diff --git a/dbms/tests/queries/0_stateless/00834_kill_mutation_replicated_zookeeper.reference b/dbms/tests/queries/0_stateless/00834_kill_mutation_replicated_zookeeper.reference index 3db1b92953c..9f1aa8e599d 100644 --- a/dbms/tests/queries/0_stateless/00834_kill_mutation_replicated_zookeeper.reference +++ b/dbms/tests/queries/0_stateless/00834_kill_mutation_replicated_zookeeper.reference @@ -1,6 +1,7 @@ *** Create and kill a single invalid mutation *** 0000000000 1 1 Code: 6, waiting test kill_mutation_r1 0000000000 +Mutation 0000000000 was killed *** Create and kill invalid mutation that blocks another mutation *** 0000000001 1 1 Code: 6, waiting test kill_mutation_r1 0000000001 diff --git a/dbms/tests/queries/0_stateless/00834_kill_mutation_replicated_zookeeper.sh b/dbms/tests/queries/0_stateless/00834_kill_mutation_replicated_zookeeper.sh index 570e4ea785b..3d0887c71b3 100755 --- a/dbms/tests/queries/0_stateless/00834_kill_mutation_replicated_zookeeper.sh +++ b/dbms/tests/queries/0_stateless/00834_kill_mutation_replicated_zookeeper.sh @@ -18,12 +18,24 @@ ${CLICKHOUSE_CLIENT} --query="INSERT INTO test.kill_mutation_r1 VALUES ('2001-01 ${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill a single invalid mutation ***'" # wrong mutation -${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE toUInt32(s) = 1 SETTINGS mutation_synchronous_wait_timeout=2" 2>/dev/null +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE toUInt32(s) = 1 SETTINGS mutations_sync=2" 2>&1 | grep -o "Mutation 0000000000 was killed" & -${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, latest_failed_part IN ('20000101_0_0_0', '20010101_0_0_0'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1'" +check_query1="SELECT mutation_id, latest_failed_part IN ('20000101_0_0_0', '20010101_0_0_0'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1'" + +query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1` + +while [ -z "$query_result" ] +do + query_result=`$CLICKHOUSE_CLIENT --query="$check_query1" 2>&1` + sleep 0.1 +done + +$CLICKHOUSE_CLIENT --query="$check_query1" ${CLICKHOUSE_CLIENT} --query="KILL MUTATION WHERE database = 'test' AND table = 'kill_mutation_r1'" +wait + ${CLICKHOUSE_CLIENT} --query="SELECT mutation_id FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1'" @@ -32,13 +44,24 @@ ${CLICKHOUSE_CLIENT} --query="SELECT '*** Create and kill invalid mutation that ${CLICKHOUSE_CLIENT} --query="SYSTEM SYNC REPLICA test.kill_mutation_r1" ${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE toUInt32(s) = 1" -${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE x = 1 SETTINGS mutation_synchronous_wait_timeout=2" 2>/dev/null +# good mutation, but blocked with wrong mutation +${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.kill_mutation_r1 DELETE WHERE x = 1 SETTINGS mutations_sync=2" & -${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, latest_failed_part IN ('20000101_0_0_0_1', '20010101_0_0_0_1'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1' AND mutation_id = '0000000001'" +check_query2="SELECT mutation_id, latest_failed_part IN ('20000101_0_0_0_1', '20010101_0_0_0_1'), latest_fail_time != 0, substr(latest_fail_reason, 1, 8) FROM system.mutations WHERE database = 'test' AND table = 'kill_mutation_r1' AND mutation_id = '0000000001'" + +query_result=`$CLICKHOUSE_CLIENT --query="$check_query2" 2>&1` + +while [ -z "$query_result" ] +do + query_result=`$CLICKHOUSE_CLIENT --query="$check_query2" 2>&1` + sleep 0.1 +done + +$CLICKHOUSE_CLIENT --query="$check_query2" ${CLICKHOUSE_CLIENT} --query="KILL MUTATION WHERE database = 'test' AND table = 'kill_mutation_r1' AND mutation_id = '0000000001'" -wait_for_mutation "kill_mutation_r2" "0000000002" "test" +wait ${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.kill_mutation_r2" diff --git a/dbms/tests/queries/0_stateless/01049_zookeeper_synchronous_mutations.reference b/dbms/tests/queries/0_stateless/01049_zookeeper_synchronous_mutations.reference index d65c89faf0e..1b9855f14a9 100644 --- a/dbms/tests/queries/0_stateless/01049_zookeeper_synchronous_mutations.reference +++ b/dbms/tests/queries/0_stateless/01049_zookeeper_synchronous_mutations.reference @@ -2,9 +2,7 @@ Replicated 1 1 1 -1 Normal 1 1 1 -1 diff --git a/dbms/tests/queries/0_stateless/01049_zookeeper_synchronous_mutations.sql b/dbms/tests/queries/0_stateless/01049_zookeeper_synchronous_mutations.sql index 3b4bb145f1a..269b568b5f6 100644 --- a/dbms/tests/queries/0_stateless/01049_zookeeper_synchronous_mutations.sql +++ b/dbms/tests/queries/0_stateless/01049_zookeeper_synchronous_mutations.sql @@ -11,14 +11,12 @@ INSERT INTO table_for_synchronous_mutations1 select number, number from numbers( SYSTEM SYNC REPLICA table_for_synchronous_mutations2; -ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutation_synchronous_wait_timeout = 10; +ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutations_sync = 2; SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations1'; -ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = 1 WHERE ignore(sleep(3)) SETTINGS mutation_synchronous_wait_timeout = 2; --{serverError 341} - -- Another mutation, just to be sure, that previous finished -ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutation_synchronous_wait_timeout = 15; +ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutations_sync = 2; SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations1'; @@ -33,16 +31,13 @@ CREATE TABLE table_for_synchronous_mutations_no_replication(k UInt32, v1 UInt64) INSERT INTO table_for_synchronous_mutations_no_replication select number, number from numbers(100000); -ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutation_synchronous_wait_timeout = 10; +ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutations_sync = 2; SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations_no_replication'; -ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = 1 WHERE ignore(sleep(3)) SETTINGS mutation_synchronous_wait_timeout = 2; --{serverError 341} - -- Another mutation, just to be sure, that previous finished -ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutation_synchronous_wait_timeout = 15; +ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutations_sync = 2; SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations_no_replication'; - DROP TABLE IF EXISTS table_for_synchronous_mutations_no_replication;