diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h index ab96011f88a..774d304b310 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h @@ -175,6 +175,12 @@ struct ReplicatedMergeTreeLogEntryData /// The quorum value (for GET_PART) is a non-zero value when the quorum write is enabled. size_t quorum = 0; + + /// If this MUTATE_PART entry caused by alter(modify/drop) query. + bool isAlterMutation() const + { + return type == MUTATE_PART && alter_version != -1; + } }; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h index dbbf4f98a71..432cca5e906 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h @@ -45,6 +45,8 @@ struct ReplicatedMergeTreeMutationEntry /// Version of metadata. Not equal to -1 only if this mutation /// was created by ALTER MODIFY/DROP queries. int alter_version = -1; + + bool isAlterMutation() const { return alter_version != -1; } }; using ReplicatedMergeTreeMutationEntryPtr = std::shared_ptr; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index c30a30ec85e..2a49a366bbb 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -698,7 +698,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C } /// otherwise it's already done - if (entry->alter_version != -1 && entry->znode_name > mutation_pointer) + if (entry->isAlterMutation() && entry->znode_name > mutation_pointer) { LOG_TRACE(log, "Adding mutation " << entry->znode_name << " with alter version " << entry->alter_version << " to the queue"); alter_chain.addMutationForAlter(entry->alter_version, state_lock); @@ -743,7 +743,7 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation( mutations_by_partition.erase(partition_and_block_num.first); } - if (entry->alter_version != -1) + if (entry->isAlterMutation()) { LOG_DEBUG(log, "Removed alter " << entry->alter_version << " because mutation " + entry->znode_name + " were killed."); alter_chain.finishDataAlter(entry->alter_version, state_lock); @@ -970,6 +970,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( MergeTreeData & data, std::lock_guard & state_lock) const { + /// If our entry produce part which is alredy covered by + /// some other entry which is currently executing, then we can postpone this entry. if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART || entry.type == LogEntry::MUTATE_PART) @@ -1053,6 +1055,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( } } + /// Alters must be executed one by one. First metadata change, and after that data alter (MUTATE_PART entries with). + /// corresponding alter_version. if (entry.type == LogEntry::ALTER_METADATA) { if (!alter_chain.canExecuteMetaAlter(entry.alter_version, state_lock)) @@ -1065,7 +1069,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( } } - if (entry.type == LogEntry::MUTATE_PART && entry.alter_version != -1) + /// If this MUTATE_PART is part of alter modify/drop query, than we have to execute them one by one + if (entry.isAlterMutation()) { if (!alter_chain.canExecuteDataAlter(entry.alter_version, state_lock)) { @@ -1077,7 +1082,6 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version) + " because another alter " + std::to_string(head_alter) + " must be executed before"; - return false; } } @@ -1402,7 +1406,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep { LOG_TRACE(log, "Mutation " << entry->znode_name << " is done"); it->second.is_done = true; - if (entry->alter_version != -1) + if (entry->isAlterMutation()) { LOG_TRACE(log, "Finishing data alter with version " << entry->alter_version << " for entry " << entry->znode_name); alter_chain.finishDataAlter(entry->alter_version, lock); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 24b3cc467da..9d40f1b28c0 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -277,7 +277,7 @@ public: void updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {}); /// Remove a mutation from ZooKeeper and from the local set. Returns the removed entry or nullptr - /// if it could not be found. + /// if it could not be found. Called during KILL MUTATION query execution. ReplicatedMergeTreeMutationEntryPtr removeMutation(zkutil::ZooKeeperPtr zookeeper, const String & mutation_id); /** Remove the action from the queue with the parts covered by part_name (from ZK and from the RAM). @@ -395,8 +395,11 @@ public: const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String * out_reason = nullptr) const; - /// Return nonempty optional if the part can and should be mutated. - /// Returned mutation version number is always the biggest possible. + /// Return nonempty optional of desired mutation version and alter version. + /// If we have no alter (modify/drop) mutations in mutations queue, than we return biggest possible + /// mutation version (and -1 as alter version). In other case, we return biggest mutation version with + /// smallest alter version. This required, because we have to execute alter mutations sequentially and + /// don't glue them together. Alter is rare operation, so it shouldn't affect performance. std::optional> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const; bool isMutationFinished(const ReplicatedMergeTreeMutationEntry & mutation) const; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 8861e62867d..9c9aaf1766d 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -976,10 +976,13 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry) } else if (entry.type == LogEntry::MERGE_PARTS) { + /// Sometimes it's better to fetch merged part instead of merge + /// For example when we don't have all source parts for merge do_fetch = !tryExecuteMerge(entry); } else if (entry.type == LogEntry::MUTATE_PART) { + /// Sometimes it's better to fetch mutated part instead of merge do_fetch = !tryExecutePartMutation(entry); } else if (entry.type == LogEntry::ALTER_METADATA) @@ -1284,6 +1287,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry) { + /// Looking for covering part. After that entry.actual_new_part_name may be filled. String replica = findReplicaHavingCoveringPart(entry, true); const auto storage_settings_ptr = getSettings(); @@ -3219,6 +3223,7 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer zookeeper->multi(requests); { + /// TODO (relax this lock) auto table_lock = lockExclusively(RWLockImpl::NO_QUERY); LOG_INFO(log, "Metadata changed in ZooKeeper. Applying changes locally."); @@ -3229,7 +3234,7 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer LOG_INFO(log, "Applied changes to the metadata of the table. Current metadata version: " << metadata_version); } - /// This transaction may not happen, but it's ok, because on the next retry we will eventually create this node + /// This transaction may not happen, but it's OK, because on the next retry we will eventually create/update this node zookeeper->createOrUpdate(replica_path + "/metadata_version", std::to_string(metadata_version), zkutil::CreateMode::Persistent); recalculateColumnSizes(); @@ -3340,6 +3345,7 @@ void StorageReplicatedMergeTree::alter( std::optional lock_holder; /// No we will prepare mutations record + /// This code pretty same with mutate() function but process results slightly differently if (alter_entry->have_mutation) { String mutations_path = zookeeper_path + "/mutations"; @@ -3371,17 +3377,17 @@ void StorageReplicatedMergeTree::alter( { if (alter_entry->have_mutation) { - /// Record in replication /log + /// ALTER_METADATA record in replication /log String alter_path = dynamic_cast(*results[2]).path_created; alter_entry->znode_name = alter_path.substr(alter_path.find_last_of('/') + 1); - /// Record in /mutations + /// ReplicatedMergeTreeMutationEntry record in /mutations String mutation_path = dynamic_cast(*results.back()).path_created; mutation_znode = mutation_path.substr(mutation_path.find_last_of('/') + 1); } else { - /// Record in replication /log + /// ALTER_METADATA record in replication /log String alter_path = dynamic_cast(*results.back()).path_created; alter_entry->znode_name = alter_path.substr(alter_path.find_last_of('/') + 1); } @@ -3392,7 +3398,6 @@ void StorageReplicatedMergeTree::alter( if (dynamic_cast(*results[0]).error) throw Exception("Metadata on replica is not up to date with common metadata in Zookeeper. Cannot alter", ErrorCodes::CANNOT_ASSIGN_ALTER); - LOG_TRACE(log, "We have version conflict with inserts because of concurrent inserts. Will try to assign alter one more time."); continue; } else diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index c43e863b66a..a5fa0bfa6c0 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -366,11 +366,20 @@ private: /// Do the merge or recommend to make the fetch instead of the merge bool tryExecuteMerge(const LogEntry & entry); + /// Execute alter of table metadata. Set replica/metdata and replica/columns + /// nodes in zookeeper and also changes in memory metadata. + /// New metadata and columns values stored in entry. bool executeMetadataAlter(const LogEntry & entry); + /// Execute MUTATE_PART entry. Part name and mutation commands + /// stored in entry. This function relies on MergerMutator class. bool tryExecutePartMutation(const LogEntry & entry); + /// Fetch part from other replica (inserted or merged/mutated) + /// NOTE: Attention! First of all tries to find covering part on other replica + /// and set it into entry.actual_new_part_name. After that tries to fetch this new covering part. + /// If fetch was not successful, clears entry.actual_new_part_name. bool executeFetch(LogEntry & entry); void executeClearColumnOrIndexInPartition(const LogEntry & entry);