diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 2a7d0fd26e5..3b983962bd6 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -425,7 +425,10 @@ Int64 MergeTreeData::getMaxBlockNumber() Int64 max_block_num = 0; for (const DataPartPtr & part : data_parts_by_info) + { max_block_num = std::max(max_block_num, part->info.max_block); + max_block_num = std::max(max_block_num, part->info.mutation); + } return max_block_num; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeMutationEntry.h b/dbms/src/Storages/MergeTree/MergeTreeMutationEntry.h new file mode 100644 index 00000000000..0aaff6c4794 --- /dev/null +++ b/dbms/src/Storages/MergeTree/MergeTreeMutationEntry.h @@ -0,0 +1,19 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +/// A mutation entry for non-replicated MergeTree storage engines. +struct MergeTreeMutationEntry +{ + time_t create_time = 0; + + Int64 block_number; + MutationCommands commands; +}; + +} diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index d2ba37e6992..df2aa375938 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -61,7 +61,7 @@ StorageMergeTree::StorageMergeTree( context_, primary_expr_ast_, secondary_sorting_expr_list_, date_column_name, partition_expr_ast_, sampling_expression_, merging_params_, settings_, false, attach), - reader(data), writer(data), merger(data, context.getBackgroundPool()), + reader(data), writer(data), merger_mutator(data, context.getBackgroundPool()), log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)")) { if (path_.empty()) @@ -94,7 +94,7 @@ void StorageMergeTree::shutdown() if (shutdown_called) return; shutdown_called = true; - merger.actions_blocker.cancelForever(); + merger_mutator.actions_blocker.cancelForever(); if (merge_task_handle) background_pool.removeTask(merge_task_handle); } @@ -139,7 +139,7 @@ void StorageMergeTree::truncate(const ASTPtr &) { /// Asks to complete merges and does not allow them to start. /// This protects against "revival" of data for a removed partition after completion of merge. - auto merge_blocker = merger.actions_blocker.cancel(); + auto merge_blocker = merger_mutator.actions_blocker.cancel(); /// NOTE: It's assumed that this method is called under lockForAlter. @@ -173,7 +173,7 @@ void StorageMergeTree::alter( const Context & context) { /// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time. - auto merge_blocker = merger.actions_blocker.cancel(); + auto merge_blocker = merger_mutator.actions_blocker.cancel(); auto table_soft_lock = lockDataForAlter(__PRETTY_FUNCTION__); @@ -284,6 +284,94 @@ struct CurrentlyMergingPartsTagger } }; + +void StorageMergeTree::mutate(const MutationCommands & commands, const Context & context) +{ + Int64 version; + decltype(current_mutations_by_version.end()) mutation_it; + { + std::lock_guard lock(currently_merging_mutex); + + version = increment.get(); + + MergeTreeMutationEntry entry; + entry.create_time = time(nullptr); + entry.block_number = version; + entry.commands = commands; + + mutation_it = current_mutations_by_version.emplace(version, std::move(entry)); + } + + size_t parts_mutated = 0; + while (true) + { + std::optional tagger; + MergeTreeDataMergerMutator::FuturePart future_mutated_part; + bool some_locked = false; + { + std::lock_guard lock(currently_merging_mutex); + + Int64 prev_version = 0; + if (mutation_it != current_mutations_by_version.begin()) + prev_version = std::prev(mutation_it)->first; + + for (const auto & part : data.getDataPartsVector()) + { + Int64 part_mutation_version = getCurrentMutationVersion(part, lock); + + if (part_mutation_version >= version) + continue; + + if (part_mutation_version < prev_version) + { + LOG_TRACE(log, + "Part " << part->name << " has mutation version " << part_mutation_version + << ", will wait until it has version " << prev_version); + some_locked = true; + continue; + } + + if (currently_merging.count(part)) + { + LOG_TRACE(log, "Part " << part->name << " is currently locked, will wait."); + some_locked = true; + continue; + } + + auto new_part_info = part->info; + new_part_info.mutation = version; + + future_mutated_part.parts.push_back(part); + future_mutated_part.part_info = new_part_info; + future_mutated_part.name = part->getNewName(new_part_info); + tagger.emplace({part}, part->bytes_on_disk, *this); + break; + } + } + + + if (!future_mutated_part.parts.empty()) + { + auto new_part = merger_mutator.mutatePartToTemporaryPart( + future_mutated_part, mutation_it->second.commands, context); + data.renameTempPartAndReplace(new_part); + ++parts_mutated; + } + else if (some_locked) + sleep(1); + else + break; + } + + { + std::lock_guard lock(currently_merging_mutex); + current_mutations_by_version.erase(mutation_it); + } + + LOG_TRACE(log, "Finished, mutated " << parts_mutated << " parts."); +} + + bool StorageMergeTree::merge( size_t aio_threshold, bool aggressive, @@ -311,22 +399,23 @@ bool StorageMergeTree::merge( { std::lock_guard lock(currently_merging_mutex); - auto can_merge = [this] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *) + auto can_merge = [this, &lock] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *) { - return !currently_merging.count(left) && !currently_merging.count(right); + return !currently_merging.count(left) && !currently_merging.count(right) + && getCurrentMutationVersion(left, lock) == getCurrentMutationVersion(right, lock); }; bool selected = false; if (partition_id.empty()) { - size_t max_source_parts_size = merger.getMaxSourcePartsSize(); + size_t max_source_parts_size = merger_mutator.getMaxSourcePartsSize(); if (max_source_parts_size > 0) - selected = merger.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, out_disable_reason); + selected = merger_mutator.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, out_disable_reason); } else { - selected = merger.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason); + selected = merger_mutator.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason); } if (!selected) @@ -385,9 +474,10 @@ bool StorageMergeTree::merge( try { - new_part = merger.mergePartsToTemporaryPart(future_part, *merge_entry, aio_threshold, time(nullptr), - merging_tagger->reserved_space.get(), deduplicate); - merger.renameMergedTemporaryPart(new_part, future_part.parts, nullptr); + new_part = merger_mutator.mergePartsToTemporaryPart( + future_part, *merge_entry, aio_threshold, time(nullptr), + merging_tagger->reserved_space.get(), deduplicate); + merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr); write_part_log({}); } @@ -406,7 +496,7 @@ bool StorageMergeTree::mergeTask() if (shutdown_called) return false; - if (merger.actions_blocker.isCancelled()) + if (merger_mutator.actions_blocker.isCancelled()) return false; try @@ -426,12 +516,23 @@ bool StorageMergeTree::mergeTask() } } +Int64 StorageMergeTree::getCurrentMutationVersion( + const MergeTreeData::DataPartPtr & part, + std::lock_guard & /* currently_merging_mutex_lock */) const +{ + auto it = current_mutations_by_version.upper_bound(part->info.getDataVersion()); + if (it == current_mutations_by_version.begin()) + return 0; + --it; + return it->first; +}; + void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Field & column_name, const Context & context) { /// Asks to complete merges and does not allow them to start. /// This protects against "revival" of data for a removed partition after completion of merge. - auto merge_blocker = merger.actions_blocker.cancel(); + auto merge_blocker = merger_mutator.actions_blocker.cancel(); /// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function auto lock_read_structure = lockStructure(false, __PRETTY_FUNCTION__); @@ -516,7 +617,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & /*query*/, const ASTPtr & pa { /// Asks to complete merges and does not allow them to start. /// This protects against "revival" of data for a removed partition after completion of merge. - auto merge_blocker = merger.actions_blocker.cancel(); + auto merge_blocker = merger_mutator.actions_blocker.cancel(); /// Waits for completion of merge and does not start new ones. auto lock = lockForAlter(__PRETTY_FUNCTION__); @@ -672,7 +773,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type) { if (action_type == ActionLocks::PartsMerge) - return merger.actions_blocker.cancel(); + return merger_mutator.actions_blocker.cancel(); return {}; } diff --git a/dbms/src/Storages/StorageMergeTree.h b/dbms/src/Storages/StorageMergeTree.h index 66bcfc46263..72eb812b363 100644 --- a/dbms/src/Storages/StorageMergeTree.h +++ b/dbms/src/Storages/StorageMergeTree.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -70,6 +71,8 @@ public: void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context) override; void freezePartition(const ASTPtr & partition, const String & with_name, const Context & context) override; + void mutate(const MutationCommands & commands, const Context & context) override; + void drop() override; void truncate(const ASTPtr &) override; @@ -98,7 +101,7 @@ private: MergeTreeData data; MergeTreeDataSelectExecutor reader; MergeTreeDataWriter writer; - MergeTreeDataMergerMutator merger; + MergeTreeDataMergerMutator merger_mutator; /// For block numbers. SimpleIncrement increment{0}; @@ -106,8 +109,9 @@ private: /// For clearOldParts, clearOldTemporaryDirectories. AtomicStopwatch time_after_previous_cleanup; - MergeTreeData::DataParts currently_merging; std::mutex currently_merging_mutex; + MergeTreeData::DataParts currently_merging; + std::multimap current_mutations_by_version; Logger * log; @@ -124,6 +128,10 @@ private: bool mergeTask(); + Int64 getCurrentMutationVersion( + const MergeTreeData::DataPartPtr & part, + std::lock_guard & /* currently_merging_mutex_lock */) const; + friend class MergeTreeBlockOutputStream; friend class MergeTreeData; friend struct CurrentlyMergingPartsTagger; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 199e23394a3..d9668050c2e 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -4084,7 +4084,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const const String & path_created = static_cast(responses[1].get())->path_created; entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); - LOG_TRACE(log, "Created mutation with id " << entry.znode_name); + LOG_TRACE(log, "Created mutation with ID " << entry.znode_name); break; } else if (rc == ZooKeeperImpl::ZooKeeper::ZBADVERSION)