diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index 00d7efb4a5b..74199925bec 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -388,6 +388,7 @@ struct Settings : public SettingsCollection \ M(SettingBool, enable_scalar_subquery_optimization, true, "If it is set to true, prevent scalar subqueries from (de)serializing large scalar values and possibly avoid running the same subquery more than once.", 0) \ M(SettingBool, optimize_trivial_count_query, true, "Process trivial 'SELECT count() FROM table' query from metadata.", 0) \ + M(SettingUInt64, mutation_synchronous_wait_timeout, 0, "Seconds to wait for synchronous execution of ALTER TABLE UPDATE/DELETE queries (mutations). After execute asynchronously. 0 - execute asynchronously from the begging.", 0) \ \ /** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \ \ diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 41c9335de1e..f93a99bf89b 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -42,6 +42,7 @@ namespace ErrorCodes extern const int PART_IS_TEMPORARILY_LOCKED; extern const int UNKNOWN_SETTING; extern const int TOO_BIG_AST; + extern const int UNFINISHED; } namespace ActionLocks @@ -425,17 +426,18 @@ public: }; -void StorageMergeTree::mutate(const MutationCommands & commands, const Context &) +void StorageMergeTree::mutate(const MutationCommands & commands, const Context & query_context) { /// Choose any disk, because when we load mutations we search them at each disk /// where storage can be placed. See loadMutations(). auto disk = storage_policy->getAnyDisk(); MergeTreeMutationEntry entry(commands, getFullPathOnDisk(disk), insert_increment.get()); String file_name; + Int64 version; { std::lock_guard lock(currently_processing_in_background_mutex); - Int64 version = increment.get(); + version = increment.get(); entry.commit(version); file_name = entry.file_name; auto insertion = current_mutations_by_id.emplace(file_name, std::move(entry)); @@ -444,6 +446,17 @@ void StorageMergeTree::mutate(const MutationCommands & commands, const Context & LOG_INFO(log, "Added mutation: " << file_name); merging_mutating_task_handle->wake(); + + size_t timeout = query_context.getSettingsRef().mutation_synchronous_wait_timeout; + /// If timeout is set, than we can wait + if (timeout != 0) + { + LOG_INFO(log, "Waiting mutation: " << file_name << " for " << timeout << " seconds"); + auto check = [version, this]() { return isMutationDone(version); }; + std::unique_lock lock(mutation_wait_mutex); + if (!mutation_wait_event.wait_for(lock, std::chrono::seconds{timeout}, check)) + throw Exception("Mutation " + file_name + " is not finished. Will be done asynchronously", ErrorCodes::UNFINISHED); + } } namespace @@ -462,6 +475,17 @@ bool comparator(const PartVersionWithName & f, const PartVersionWithName & s) } +bool StorageMergeTree::isMutationDone(Int64 mutation_version) const +{ + std::lock_guard lock(currently_processing_in_background_mutex); + + auto data_parts = getDataPartsVector(); + for (const auto & data_part : data_parts) + if (data_part->info.getDataVersion() < mutation_version) + return false; + return true; +} + std::vector StorageMergeTree::getMutationsStatus() const { std::lock_guard lock(currently_processing_in_background_mutex); @@ -771,6 +795,9 @@ bool StorageMergeTree::tryMutatePart() renameTempPartAndReplace(new_part); tagger->is_successful = true; write_part_log({}); + + /// Notify all, who wait for this or previous mutations + mutation_wait_event.notify_all(); } catch (...) { diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index dfef9261145..5972717e980 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -79,6 +79,10 @@ public: private: + /// Mutex and condvar for synchronous mutations wait + std::mutex mutation_wait_mutex; + std::condition_variable mutation_wait_event; + MergeTreeDataSelectExecutor reader; MergeTreeDataWriter writer; MergeTreeDataMergerMutator merger_mutator; @@ -138,6 +142,8 @@ private: void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context); bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; + /// Just checks versions of each active data part + bool isMutationDone(Int64 mutation_version) const; friend class MergeTreeBlockOutputStream; friend class MergeTreeData; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index b3f69d463f2..be8124b05ca 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -54,6 +54,7 @@ #include #include +#include namespace ProfileEvents { @@ -309,6 +310,92 @@ bool StorageReplicatedMergeTree::checkFixedGranualrityInZookeeper() } +void StorageReplicatedMergeTree::waitForAllReplicasToStatisfyNodeCondition( + size_t timeout, const String & name_for_logging, + const String & replica_relative_node_path, CheckNodeCallback callback) const +{ + const auto operation_start = std::chrono::system_clock::now(); + std::chrono::milliseconds total_time{timeout * 1000}; + zkutil::EventPtr wait_event = std::make_shared(); + Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas"); + std::set inactive_replicas; + std::set timed_out_replicas; + for (const String & replica : replicas) + { + LOG_DEBUG(log, "Waiting for " << replica << " to apply " + name_for_logging); + + bool operation_is_processed_by_relica = false; + while (!partial_shutdown_called) + { + auto zookeeper = getZooKeeper(); + /// Replica could be inactive. + if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")) + { + LOG_WARNING(log, "Replica " << replica << " is not active during mutation query." + << name_for_logging << " will be done asynchronously when replica becomes active."); + + inactive_replicas.emplace(replica); + break; + } + + String node_for_check = zookeeper_path + "/replicas/" + replica + "/" + replica_relative_node_path; + std::string node_for_check_value; + Coordination::Stat stat; + /// Replica could be removed + if (!zookeeper->tryGet(node_for_check, node_for_check_value, &stat, wait_event)) + { + LOG_WARNING(log, replica << " was removed"); + operation_is_processed_by_relica = true; + break; + } + else /// in other case check required node + { + if (callback(node_for_check_value)) + { + operation_is_processed_by_relica = true; + break; /// operation is done + } + } + + std::chrono::milliseconds time_spent = + std::chrono::duration_cast(std::chrono::system_clock::now() - operation_start); + std::chrono::milliseconds time_left = total_time - time_spent; + + /// We have some time to wait + if (time_left.count() > 0) + wait_event->tryWait(time_left.count()); + else /// Otherwise time is up + break; + } + + if (partial_shutdown_called) + throw Exception(name_for_logging + " is not finished because table shutdown was called. " + name_for_logging + " will be done after table restart.", + ErrorCodes::UNFINISHED); + + if (!operation_is_processed_by_relica && !inactive_replicas.count(replica)) + timed_out_replicas.emplace(replica); + } + + if (!inactive_replicas.empty() || !timed_out_replicas.empty()) + { + std::stringstream exception_message; + exception_message << name_for_logging << " is not finished because"; + + if (!inactive_replicas.empty()) + exception_message << " some replicas are inactive right now: " << boost::algorithm::join(inactive_replicas, ", "); + + if (!timed_out_replicas.empty() && !inactive_replicas.empty()) + exception_message << " and"; + + if (!timed_out_replicas.empty()) + exception_message << " timeout when waiting for some replicas: " << boost::algorithm::join(timed_out_replicas, ", "); + + exception_message << ". " << name_for_logging << " will be done asynchronously"; + + throw Exception(exception_message.str(), ErrorCodes::UNFINISHED); + } +} + void StorageReplicatedMergeTree::createNewZooKeeperNodes() { auto zookeeper = getZooKeeper(); @@ -3200,6 +3287,7 @@ void StorageReplicatedMergeTree::alter( int32_t new_version = -1; /// Initialization is to suppress (useless) false positive warning found by cppcheck. }; + /// /columns and /metadata nodes std::vector changed_nodes; { @@ -3294,6 +3382,10 @@ void StorageReplicatedMergeTree::alter( time_t replication_alter_columns_timeout = query_context.getSettingsRef().replication_alter_columns_timeout; + /// This code is quite similar with waitForAllReplicasToStatisfyNodeCondition + /// but contains more complicated details (versions manipulations, multiple nodes, etc.). + /// It will be removed soon in favor of alter-modify implementation on top of mutations. + /// TODO (alesap) for (const String & replica : replicas) { LOG_DEBUG(log, "Waiting for " << replica << " to apply changes"); @@ -3396,8 +3488,16 @@ void StorageReplicatedMergeTree::alter( if (replica_nodes_changed_concurrently) continue; - /// Now wait for replica nodes to change. - + /// alter_query_event subscribed with zookeeper watch callback to /repliacs/{replica}/metadata + /// and /replicas/{replica}/columns nodes for current relica + shared nodes /columns and /metadata, + /// which is common for all replicas. If changes happen with this nodes (delete, set and create) + /// than event will be notified and wait will be interrupted. + /// + /// ReplicatedMergeTreeAlterThread responsible for local /replicas/{replica}/metadata and + /// /replicas/{replica}/columns changes. Shared /columns and /metadata nodes can be changed by *newer* + /// concurrent alter from other replica. First of all it will update shared nodes and we will have no + /// ability to identify, that our *current* alter finshed. So we cannot do anything better than just + /// return from *current* alter with success result. if (!replication_alter_columns_timeout) { alter_query_event->wait(); @@ -4399,7 +4499,7 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const } -void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context &) +void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context & query_context) { /// Overview of the mutation algorithm. /// @@ -4502,6 +4602,20 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const else throw Coordination::Exception("Unable to create a mutation znode", rc); } + + if (query_context.getSettingsRef().mutation_synchronous_wait_timeout != 0) /// some timeout specified + { + auto check_callback = [mutation_number = entry.znode_name](const String & zk_value) + { + /// Maybe we already processed more fresh mutation + /// We can compare their znode names (numbers like 0000000000 and 0000000001). + return zk_value >= mutation_number; + }; + + waitForAllReplicasToStatisfyNodeCondition( + query_context.getSettingsRef().mutation_synchronous_wait_timeout, "Mutation", "mutation_pointer", check_callback); + } + } std::vector StorageReplicatedMergeTree::getMutationsStatus() const diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 3727d5de0d8..55957439a1c 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -532,6 +532,15 @@ private: /// return true if it's fixed bool checkFixedGranualrityInZookeeper(); + using CheckNodeCallback = std::function; + + /// Wait for timeout seconds when condition became true for node + /// /replicas/{replica}/replica_replative_node_path value for all replicas. + /// operation_name_for_logging used for logging about errors. + void waitForAllReplicasToStatisfyNodeCondition( + size_t timeout, const String & operaton_name_for_logging, + const String & replica_relative_node_path, CheckNodeCallback condition) const; + protected: /** If not 'attach', either creates a new table in ZK, or adds a replica to an existing table. */ diff --git a/dbms/tests/queries/0_stateless/01049_zookeeper_synchronous_mutations.reference b/dbms/tests/queries/0_stateless/01049_zookeeper_synchronous_mutations.reference new file mode 100644 index 00000000000..d65c89faf0e --- /dev/null +++ b/dbms/tests/queries/0_stateless/01049_zookeeper_synchronous_mutations.reference @@ -0,0 +1,10 @@ +Replicated +1 +1 +1 +1 +Normal +1 +1 +1 +1 diff --git a/dbms/tests/queries/0_stateless/01049_zookeeper_synchronous_mutations.sql b/dbms/tests/queries/0_stateless/01049_zookeeper_synchronous_mutations.sql new file mode 100644 index 00000000000..ede0ee22a50 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01049_zookeeper_synchronous_mutations.sql @@ -0,0 +1,48 @@ +DROP TABLE IF EXISTS table_for_synchronous_mutations1; +DROP TABLE IF EXISTS table_for_synchronous_mutations2; + +SELECT 'Replicated'; + +CREATE TABLE table_for_synchronous_mutations1(k UInt32, v1 UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/table_for_synchronous_mutations', '1') ORDER BY k PARTITION BY modulo(k, 2); + +CREATE TABLE table_for_synchronous_mutations2(k UInt32, v1 UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/table_for_synchronous_mutations', '2') ORDER BY k PARTITION BY modulo(k, 2); + +INSERT INTO table_for_synchronous_mutations1 select number, number from numbers(100000); + +SYSTEM SYNC REPLICA table_for_synchronous_mutations2; + +ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutation_synchronous_wait_timeout = 10; + +SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations1'; + +ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = 1 WHERE ignore(sleep(3)) SETTINGS mutation_synchronous_wait_timeout = 2; --{serverError 341} + +-- Another mutation, just to be sure, that previous finished +ALTER TABLE table_for_synchronous_mutations1 UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutation_synchronous_wait_timeout = 10; + +SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations1'; + +DROP TABLE IF EXISTS table_for_synchronous_mutations1; +DROP TABLE IF EXISTS table_for_synchronous_mutations2; + +SELECT 'Normal'; + +DROP TABLE IF EXISTS table_for_synchronous_mutations_no_replication; + +CREATE TABLE table_for_synchronous_mutations_no_replication(k UInt32, v1 UInt64) ENGINE MergeTree ORDER BY k PARTITION BY modulo(k, 2); + +INSERT INTO table_for_synchronous_mutations_no_replication select number, number from numbers(100000); + +ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutation_synchronous_wait_timeout = 10; + +SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations_no_replication'; + +ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = 1 WHERE ignore(sleep(3)) SETTINGS mutation_synchronous_wait_timeout = 2; --{serverError 341} + +-- Another mutation, just to be sure, that previous finished +ALTER TABLE table_for_synchronous_mutations_no_replication UPDATE v1 = v1 + 1 WHERE 1 SETTINGS mutation_synchronous_wait_timeout = 10; + +SELECT is_done FROM system.mutations where table = 'table_for_synchronous_mutations_no_replication'; + + +DROP TABLE IF EXISTS table_for_synchronous_mutations_no_replication;