diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index b20b5e19e1a..1a7062766b0 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3630,7 +3630,7 @@ std::optional MergeTreeData::getDataMovingJob() if (moving_tagger->parts_to_move.empty()) return {}; - return JobAndPool{[this, moving_tagger{std::move(moving_tagger)}] () mutable + return JobAndPool{[this, moving_tagger] () mutable { moveParts(moving_tagger); }, PoolType::MOVE}; diff --git a/src/Storages/MergeTree/MergeTreePartsMover.h b/src/Storages/MergeTree/MergeTreePartsMover.h index 332a0988d10..a1afadec7fa 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.h +++ b/src/Storages/MergeTree/MergeTreePartsMover.h @@ -16,7 +16,7 @@ namespace DB struct MergeTreeMoveEntry { std::shared_ptr part; - std::shared_ptr reserved_space; + ReservationPtr reserved_space; MergeTreeMoveEntry(const std::shared_ptr & part_, ReservationPtr reservation_) : part(part_), reserved_space(std::move(reservation_)) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index f0732774c3e..95358ecee97 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1259,7 +1259,7 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting() } -ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data) +ReplicatedMergeTreeQueue::SelectedEntryPtr ReplicatedMergeTreeQueue::selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data) { LogEntryPtr entry; @@ -1286,7 +1286,7 @@ ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToP } if (entry) - return { entry, std::shared_ptr{ new CurrentlyExecuting(entry, *this) } }; + return std::make_shared(entry, std::unique_ptr{ new CurrentlyExecuting(entry, *this) }); else return {}; } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index c72569a5071..ead97579a4f 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -259,6 +259,8 @@ private: ~CurrentlyExecuting(); }; + using CurrentlyExecutingPtr = std::unique_ptr; + public: ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_); ~ReplicatedMergeTreeQueue(); @@ -319,8 +321,19 @@ public: /** Select the next action to process. * merger_mutator is used only to check if the merges are not suspended. */ - using SelectedEntry = std::pair>; - SelectedEntry selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data); + struct SelectedEntry + { + ReplicatedMergeTreeQueue::LogEntryPtr log_entry; + CurrentlyExecutingPtr currently_executing_holder; + + SelectedEntry(const ReplicatedMergeTreeQueue::LogEntryPtr & log_entry_, CurrentlyExecutingPtr && currently_executing_holder_) + : log_entry(log_entry_) + , currently_executing_holder(std::move(currently_executing_holder_)) + {} + }; + + using SelectedEntryPtr = std::shared_ptr; + SelectedEntryPtr selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data); /** Execute `func` function to handle the action. * In this case, at runtime, mark the queue element as running diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 0e0cc13b0c7..11e8859e76c 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -309,72 +309,62 @@ void StorageMergeTree::alter( /// While exists, marks parts as 'currently_merging_mutating_parts' and reserves free space on filesystem. -struct CurrentlyMergingPartsTagger +StorageMergeTree::CurrentlyMergingPartsTagger::CurrentlyMergingPartsTagger( + FutureMergedMutatedPart & future_part_, + size_t total_size, + StorageMergeTree & storage_, + const StorageMetadataPtr & metadata_snapshot, + bool is_mutation) + : future_part(future_part_), storage(storage_) { - FutureMergedMutatedPart future_part; - ReservationPtr reserved_space; + /// Assume mutex is already locked, because this method is called from mergeTask. - StorageMergeTree & storage; - -public: - CurrentlyMergingPartsTagger( - FutureMergedMutatedPart & future_part_, - size_t total_size, - StorageMergeTree & storage_, - const StorageMetadataPtr & metadata_snapshot, - bool is_mutation) - : future_part(future_part_), storage(storage_) + /// if we mutate part, than we should reserve space on the same disk, because mutations possible can create hardlinks + if (is_mutation) + reserved_space = storage.tryReserveSpace(total_size, future_part_.parts[0]->volume); + else { - /// Assume mutex is already locked, because this method is called from mergeTask. + IMergeTreeDataPart::TTLInfos ttl_infos; + size_t max_volume_index = 0; + for (auto & part_ptr : future_part_.parts) + { + ttl_infos.update(part_ptr->ttl_infos); + max_volume_index = std::max(max_volume_index, storage.getStoragePolicy()->getVolumeIndexByDisk(part_ptr->volume->getDisk())); + } - /// if we mutate part, than we should reserve space on the same disk, because mutations possible can create hardlinks + reserved_space = storage.tryReserveSpacePreferringTTLRules(metadata_snapshot, total_size, ttl_infos, time(nullptr), max_volume_index); + } + if (!reserved_space) + { if (is_mutation) - reserved_space = storage.tryReserveSpace(total_size, future_part_.parts[0]->volume); + throw Exception("Not enough space for mutating part '" + future_part_.parts[0]->name + "'", ErrorCodes::NOT_ENOUGH_SPACE); else - { - IMergeTreeDataPart::TTLInfos ttl_infos; - size_t max_volume_index = 0; - for (auto & part_ptr : future_part_.parts) - { - ttl_infos.update(part_ptr->ttl_infos); - max_volume_index = std::max(max_volume_index, storage.getStoragePolicy()->getVolumeIndexByDisk(part_ptr->volume->getDisk())); - } - - reserved_space = storage.tryReserveSpacePreferringTTLRules(metadata_snapshot, total_size, ttl_infos, time(nullptr), max_volume_index); - } - if (!reserved_space) - { - if (is_mutation) - throw Exception("Not enough space for mutating part '" + future_part_.parts[0]->name + "'", ErrorCodes::NOT_ENOUGH_SPACE); - else - throw Exception("Not enough space for merging parts", ErrorCodes::NOT_ENOUGH_SPACE); - } - - future_part_.updatePath(storage, reserved_space); - - for (const auto & part : future_part.parts) - { - if (storage.currently_merging_mutating_parts.count(part)) - throw Exception("Tagging already tagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); - } - storage.currently_merging_mutating_parts.insert(future_part.parts.begin(), future_part.parts.end()); + throw Exception("Not enough space for merging parts", ErrorCodes::NOT_ENOUGH_SPACE); } - ~CurrentlyMergingPartsTagger() + future_part_.updatePath(storage, reserved_space); + + for (const auto & part : future_part.parts) { - std::lock_guard lock(storage.currently_processing_in_background_mutex); - - for (const auto & part : future_part.parts) - { - if (!storage.currently_merging_mutating_parts.count(part)) - std::terminate(); - storage.currently_merging_mutating_parts.erase(part); - } - - storage.currently_processing_in_background_condition.notify_all(); + if (storage.currently_merging_mutating_parts.count(part)) + throw Exception("Tagging already tagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR); } -}; + storage.currently_merging_mutating_parts.insert(future_part.parts.begin(), future_part.parts.end()); +} +StorageMergeTree::CurrentlyMergingPartsTagger::~CurrentlyMergingPartsTagger() +{ + std::lock_guard lock(storage.currently_processing_in_background_mutex); + + for (const auto & part : future_part.parts) + { + if (!storage.currently_merging_mutating_parts.count(part)) + std::terminate(); + storage.currently_merging_mutating_parts.erase(part); + } + + storage.currently_processing_in_background_condition.notify_all(); +} Int64 StorageMergeTree::startMutation(const MutationCommands & commands, String & mutation_file_name) { @@ -643,7 +633,7 @@ void StorageMergeTree::loadMutations() increment.value = std::max(Int64(increment.value.load()), current_mutations_by_version.rbegin()->first); } -std::optional StorageMergeTree::selectPartsToMerge( +std::shared_ptr StorageMergeTree::selectPartsToMerge( const StorageMetadataPtr & metadata_snapshot, bool aggressive, const String & partition_id, bool final, String * out_disable_reason, TableLockHolder & /* table_lock_holder */) { std::unique_lock lock(currently_processing_in_background_mutex); @@ -733,8 +723,8 @@ std::optional StorageMergeTree::sele return {}; } - merging_tagger = std::make_shared(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, metadata_snapshot, false); - return MergeMutateSelectedEntry{future_part, std::move(merging_tagger), {}}; + merging_tagger = std::make_unique(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, metadata_snapshot, false); + return std::make_shared(future_part, std::move(merging_tagger), MutationCommands{}); } bool StorageMergeTree::merge( @@ -799,7 +789,7 @@ bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & p return currently_merging_mutating_parts.count(part); } -std::optional StorageMergeTree::selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String */* disable_reason */, TableLockHolder & /* table_lock_holder */) +std::shared_ptr StorageMergeTree::selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String */* disable_reason */, TableLockHolder & /* table_lock_holder */) { std::lock_guard lock(currently_processing_in_background_mutex); size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements; @@ -873,8 +863,8 @@ std::optional StorageMergeTree::sele future_part.name = part->getNewName(new_part_info); future_part.type = part->getType(); - tagger = std::make_shared(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true); - return MergeMutateSelectedEntry{future_part, std::move(tagger), commands}; + tagger = std::make_unique(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true); + return std::make_shared(future_part, std::move(tagger), commands); } return {}; } @@ -930,7 +920,7 @@ std::optional StorageMergeTree::getDataProcessingJob() return {}; auto metadata_snapshot = getInMemoryMetadataPtr(); - std::optional merge_entry, mutate_entry; + std::shared_ptr merge_entry, mutate_entry; auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr, share_lock); @@ -939,7 +929,7 @@ std::optional StorageMergeTree::getDataProcessingJob() if (merge_entry || mutate_entry) { - return JobAndPool{[this, metadata_snapshot, merge_entry{std::move(merge_entry)}, mutate_entry{std::move(mutate_entry)}, share_lock] () mutable + return JobAndPool{[this, metadata_snapshot, merge_entry, mutate_entry, share_lock] () mutable { if (merge_entry) mergeSelectedParts(metadata_snapshot, false, *merge_entry, share_lock); diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 43982ddbc78..71a32fbd203 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -21,8 +21,6 @@ namespace DB { -struct CurrentlyMergingPartsTagger; - /** See the description of the data structure in MergeTreeData. */ class StorageMergeTree final : public ext::shared_ptr_helper, public MergeTreeData @@ -140,21 +138,42 @@ private: /// Wait until mutation with version will finish mutation for all parts void waitForMutation(Int64 version, const String & file_name); - friend struct CurrentlyMergingPartsTagger; + struct CurrentlyMergingPartsTagger + { + FutureMergedMutatedPart future_part; + ReservationPtr reserved_space; - using CurrentlyMergingPartsTaggerPtr = std::shared_ptr; + StorageMergeTree & storage; + + CurrentlyMergingPartsTagger( + FutureMergedMutatedPart & future_part_, + size_t total_size, + StorageMergeTree & storage_, + const StorageMetadataPtr & metadata_snapshot, + bool is_mutation); + + ~CurrentlyMergingPartsTagger(); + }; + + using CurrentlyMergingPartsTaggerPtr = std::unique_ptr; + friend struct CurrentlyMergingPartsTagger; struct MergeMutateSelectedEntry { FutureMergedMutatedPart future_part; CurrentlyMergingPartsTaggerPtr tagger; MutationCommands commands; + MergeMutateSelectedEntry(const FutureMergedMutatedPart & future_part_, CurrentlyMergingPartsTaggerPtr && tagger_, const MutationCommands & commands_) + : future_part(future_part_) + , tagger(std::move(tagger_)) + , commands(commands_) + {} }; - std::optional selectPartsToMerge(const StorageMetadataPtr & metadata_snapshot, bool aggressive, const String & partition_id, bool final, String * disable_reason, TableLockHolder & table_lock_holder); + std::shared_ptr selectPartsToMerge(const StorageMetadataPtr & metadata_snapshot, bool aggressive, const String & partition_id, bool final, String * disable_reason, TableLockHolder & table_lock_holder); bool mergeSelectedParts(const StorageMetadataPtr & metadata_snapshot, bool deduplicate, MergeMutateSelectedEntry & entry, TableLockHolder & table_lock_holder); - std::optional selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String * disable_reason, TableLockHolder & table_lock_holder); + std::shared_ptr selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String * disable_reason, TableLockHolder & table_lock_holder); bool mutateSelectedPart(const StorageMetadataPtr & metadata_snapshot, MergeMutateSelectedEntry & entry, TableLockHolder & table_lock_holder); Int64 getCurrentMutationVersion( diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 5d670ab2edf..53f2ff14b3b 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2540,10 +2540,10 @@ void StorageReplicatedMergeTree::mutationsUpdatingTask() } } -ReplicatedMergeTreeQueue::SelectedEntry StorageReplicatedMergeTree::selectQueueEntry() +ReplicatedMergeTreeQueue::SelectedEntryPtr StorageReplicatedMergeTree::selectQueueEntry() { /// This object will mark the element of the queue as running. - ReplicatedMergeTreeQueue::SelectedEntry selected; + ReplicatedMergeTreeQueue::SelectedEntryPtr selected; try { @@ -2557,10 +2557,10 @@ ReplicatedMergeTreeQueue::SelectedEntry StorageReplicatedMergeTree::selectQueueE return selected; } -bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntry & selected_entry) +bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry) { - LogEntryPtr & entry = selected_entry.first; + LogEntryPtr & entry = selected_entry->log_entry; return queue.processEntry([this]{ return getZooKeeper(); }, entry, [&](LogEntryPtr & entry_to_process) { try @@ -2609,14 +2609,12 @@ std::optional StorageReplicatedMergeTree::getDataProcessingJob() return {}; /// This object will mark the element of the queue as running. - ReplicatedMergeTreeQueue::SelectedEntry selected_entry = selectQueueEntry(); + ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry = selectQueueEntry(); - LogEntryPtr & entry = selected_entry.first; - - if (!entry) + if (!selected_entry) return {}; - return JobAndPool{[this, selected_entry{std::move(selected_entry)}] () mutable + return JobAndPool{[this, selected_entry] () mutable { processQueueEntry(selected_entry); }, PoolType::MERGE_MUTATE}; diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 277c1302540..92e17412ecc 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -418,9 +418,9 @@ private: void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper); - ReplicatedMergeTreeQueue::SelectedEntry selectQueueEntry(); + ReplicatedMergeTreeQueue::SelectedEntryPtr selectQueueEntry(); - bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntry & entry); + bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr entry); /// Postcondition: /// either leader_election is fully initialized (node in ZK is created and the watching thread is launched)