diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index a1045e98b03..4ea3c70faf7 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -619,6 +619,7 @@ void StorageMergeTree::loadMutations() std::optional StorageMergeTree::selectPartsToMerge(const StorageMetadataPtr &, bool aggressive, const String & partition_id, bool final, String * out_disable_reason) { + auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); std::unique_lock lock(currently_processing_in_background_mutex); auto data_settings = getSettings(); @@ -718,15 +719,18 @@ bool StorageMergeTree::merge( bool deduplicate, String * out_disable_reason) { - auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); auto metadata_snapshot = getInMemoryMetadataPtr(); auto merge_mutate_entry = selectPartsToMerge(metadata_snapshot, aggressive, partition_id, final, out_disable_reason); if (!merge_mutate_entry) return false; - auto & future_part = merge_mutate_entry->future_part; - /// Logging + return mergeSelectedParts(metadata_snapshot, deduplicate, *merge_mutate_entry); +} + +bool StorageMergeTree::mergeSelectedParts(const StorageMetadataPtr & metadata_snapshot, bool deduplicate, MergeMutateSelectedEntry & merge_mutate_entry) { + auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); + auto & future_part = merge_mutate_entry.future_part; Stopwatch stopwatch; MutableDataPartPtr new_part; @@ -739,14 +743,14 @@ bool StorageMergeTree::merge( future_part.name, new_part, future_part.parts, - merge_mutate_entry->merge_entry.get()); + merge_mutate_entry.merge_entry.get()); }; try { new_part = merger_mutator.mergePartsToTemporaryPart( - future_part, metadata_snapshot, *(merge_mutate_entry->merge_entry), table_lock_holder, time(nullptr), - merge_mutate_entry->tagger->reserved_space, deduplicate); + future_part, metadata_snapshot, *(merge_mutate_entry.merge_entry), table_lock_holder, time(nullptr), + merge_mutate_entry.tagger->reserved_space, deduplicate); merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr); write_part_log({}); @@ -760,7 +764,6 @@ bool StorageMergeTree::merge( return true; } - bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & part) const { std::lock_guard background_processing_lock(currently_processing_in_background_mutex); @@ -786,6 +789,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::movePartsTask() std::optional StorageMergeTree::selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String */* disable_reason */) { + auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); std::lock_guard lock(currently_processing_in_background_mutex); size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements; @@ -868,14 +872,19 @@ std::optional StorageMergeTree::sele bool StorageMergeTree::tryMutatePart() { - auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); StorageMetadataPtr metadata_snapshot = getInMemoryMetadataPtr(); auto merge_mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr); if (!merge_mutate_entry) return false; - auto & future_part = merge_mutate_entry->future_part; + return mutateSelectedPart(metadata_snapshot, *merge_mutate_entry); +} + +bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_snapshot, MergeMutateSelectedEntry & merge_mutate_entry) +{ + auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); + auto & future_part = merge_mutate_entry.future_part; Stopwatch stopwatch; MutableDataPartPtr new_part; @@ -888,14 +897,14 @@ bool StorageMergeTree::tryMutatePart() future_part.name, new_part, future_part.parts, - merge_mutate_entry->merge_entry.get()); + merge_mutate_entry.merge_entry.get()); }; try { new_part = merger_mutator.mutatePartToTemporaryPart( - future_part, metadata_snapshot, merge_mutate_entry->commands, *(merge_mutate_entry->merge_entry), - time(nullptr), global_context, merge_mutate_entry->tagger->reserved_space, table_lock_holder); + future_part, metadata_snapshot, merge_mutate_entry.commands, *(merge_mutate_entry.merge_entry), + time(nullptr), global_context, merge_mutate_entry.tagger->reserved_space, table_lock_holder); renameTempPartAndReplace(new_part); @@ -912,7 +921,6 @@ bool StorageMergeTree::tryMutatePart() return true; } - BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask() { if (shutdown_called) diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 9b740f575c2..ac6939bd900 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -155,7 +155,10 @@ private: }; std::optional selectPartsToMerge(const StorageMetadataPtr & metadata_snapshot, bool aggressive, const String & partition_id, bool final, String * disable_reason); + bool mergeSelectedParts(const StorageMetadataPtr & metadata_snapshot, bool deduplicate, MergeMutateSelectedEntry & entry); + std::optional selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String * disable_reason); + bool mutateSelectedPart(const StorageMetadataPtr & metadata_snapshot, MergeMutateSelectedEntry & entry); BackgroundProcessingPoolTaskResult mergeMutateTask();