diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 0f1afe1bd62..6a2823031f2 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -622,6 +622,100 @@ void StorageMergeTree::loadMutations() increment.value = std::max(Int64(increment.value.load()), current_mutations_by_version.rbegin()->first); } +std::optional StorageMergeTree::selectPartsToMerge(const StorageMetadataPtr &, bool aggressive, const String & partition_id, bool final, String * out_disable_reason) +{ + std::unique_lock lock(currently_processing_in_background_mutex); + auto data_settings = getSettings(); + + FutureMergedMutatedPart future_part; + + /// You must call destructor with unlocked `currently_processing_in_background_mutex`. + CurrentlyMergingPartsTaggerPtr merging_tagger; + MergeList::EntryPtr merge_entry; + + auto can_merge = [this, &lock] (const DataPartPtr & left, const DataPartPtr & right, String *) -> bool + { + /// This predicate is checked for the first part of each partition. + /// (left = nullptr, right = "first part of partition") + if (!left) + return !currently_merging_mutating_parts.count(right); + return !currently_merging_mutating_parts.count(left) && !currently_merging_mutating_parts.count(right) + && getCurrentMutationVersion(left, lock) == getCurrentMutationVersion(right, lock); + }; + + bool selected = false; + + if (partition_id.empty()) + { + UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); + bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool; + + /// TTL requirements is much more strict than for regular merge, so + /// if regular not possible, than merge with ttl is not also not + /// possible. + if (max_source_parts_size > 0) + { + selected = merger_mutator.selectPartsToMerge( + future_part, + aggressive, + max_source_parts_size, + can_merge, + merge_with_ttl_allowed, + out_disable_reason); + } + else if (out_disable_reason) + *out_disable_reason = "Current value of max_source_parts_size is zero"; + } + else + { + while (true) + { + UInt64 disk_space = getStoragePolicy()->getMaxUnreservedFreeSpace(); + selected = merger_mutator.selectAllPartsToMergeWithinPartition( + future_part, disk_space, can_merge, partition_id, final, out_disable_reason); + + /// If final - we will wait for currently processing merges to finish and continue. + /// TODO Respect query settings for timeout + if (final + && !selected + && !currently_merging_mutating_parts.empty() + && out_disable_reason + && out_disable_reason->empty()) + { + LOG_DEBUG(log, "Waiting for currently running merges ({} parts are merging right now) to perform OPTIMIZE FINAL", + currently_merging_mutating_parts.size()); + + if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for( + lock, std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC))) + { + *out_disable_reason = "Timeout while waiting for already running merges before running OPTIMIZE with FINAL"; + break; + } + } + else + break; + } + } + + if (!selected) + { + if (out_disable_reason) + { + if (!out_disable_reason->empty()) + { + *out_disable_reason += ". "; + } + *out_disable_reason += "Cannot select parts for optimization"; + } + + return {}; + } + + merging_tagger = std::make_unique(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, false); + auto table_id = getStorageID(); + merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part); + return MergeMutateSelectedEntry{future_part, std::move(merging_tagger), std::move(merge_entry), {}}; +} bool StorageMergeTree::merge( bool aggressive, @@ -632,99 +726,12 @@ bool StorageMergeTree::merge( { auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); auto metadata_snapshot = getInMemoryMetadataPtr(); - auto data_settings = getSettings(); - FutureMergedMutatedPart future_part; - - /// You must call destructor with unlocked `currently_processing_in_background_mutex`. - std::optional merging_tagger; - MergeList::EntryPtr merge_entry; - - { - std::unique_lock lock(currently_processing_in_background_mutex); - - auto can_merge = [this, &lock] (const DataPartPtr & left, const DataPartPtr & right, String *) -> bool - { - /// This predicate is checked for the first part of each partition. - /// (left = nullptr, right = "first part of partition") - if (!left) - return !currently_merging_mutating_parts.count(right); - return !currently_merging_mutating_parts.count(left) && !currently_merging_mutating_parts.count(right) - && getCurrentMutationVersion(left, lock) == getCurrentMutationVersion(right, lock); - }; - - bool selected = false; - - if (partition_id.empty()) - { - UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); - bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool; - - /// TTL requirements is much more strict than for regular merge, so - /// if regular not possible, than merge with ttl is not also not - /// possible. - if (max_source_parts_size > 0) - { - selected = merger_mutator.selectPartsToMerge( - future_part, - aggressive, - max_source_parts_size, - can_merge, - merge_with_ttl_allowed, - out_disable_reason); - } - else if (out_disable_reason) - *out_disable_reason = "Current value of max_source_parts_size is zero"; - } - else - { - while (true) - { - UInt64 disk_space = getStoragePolicy()->getMaxUnreservedFreeSpace(); - selected = merger_mutator.selectAllPartsToMergeWithinPartition( - future_part, disk_space, can_merge, partition_id, final, out_disable_reason); - - /// If final - we will wait for currently processing merges to finish and continue. - /// TODO Respect query settings for timeout - if (final - && !selected - && !currently_merging_mutating_parts.empty() - && out_disable_reason - && out_disable_reason->empty()) - { - LOG_DEBUG(log, "Waiting for currently running merges ({} parts are merging right now) to perform OPTIMIZE FINAL", - currently_merging_mutating_parts.size()); - - if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for( - lock, std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC))) - { - *out_disable_reason = "Timeout while waiting for already running merges before running OPTIMIZE with FINAL"; - break; - } - } - else - break; - } - } - - if (!selected) - { - if (out_disable_reason) - { - if (!out_disable_reason->empty()) - { - *out_disable_reason += ". "; - } - *out_disable_reason += "Cannot select parts for optimization"; - } - return false; - } - - merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, metadata_snapshot, false); - auto table_id = getStorageID(); - merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part); - } + auto merge_mutate_entry = selectPartsToMerge(metadata_snapshot, aggressive, partition_id, final, out_disable_reason); + if (!merge_mutate_entry) + return false; + auto & future_part = merge_mutate_entry->future_part; /// Logging Stopwatch stopwatch; MutableDataPartPtr new_part; @@ -738,14 +745,14 @@ bool StorageMergeTree::merge( future_part.name, new_part, future_part.parts, - merge_entry.get()); + merge_mutate_entry->merge_entry.get()); }; try { new_part = merger_mutator.mergePartsToTemporaryPart( - future_part, metadata_snapshot, *merge_entry, table_lock_holder, time(nullptr), global_context, - merging_tagger->reserved_space, deduplicate); + future_part, metadata_snapshot, *(merge_mutate_entry->merge_entry), table_lock_holder, time(nullptr), + merge_mutate_entry->tagger->reserved_space, deduplicate); merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr); write_part_log({}); @@ -783,95 +790,98 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::movePartsTask() } } +std::optional StorageMergeTree::selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String */* disable_reason */) +{ + std::lock_guard lock(currently_processing_in_background_mutex); + size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements; + + FutureMergedMutatedPart future_part; + MutationCommands commands; + + CurrentlyMergingPartsTaggerPtr tagger; + + if (current_mutations_by_version.empty()) + return {}; + + auto mutations_end_it = current_mutations_by_version.end(); + for (const auto & part : getDataPartsVector()) + { + if (currently_merging_mutating_parts.count(part)) + continue; + + auto mutations_begin_it = current_mutations_by_version.upper_bound(part->info.getDataVersion()); + if (mutations_begin_it == mutations_end_it) + continue; + + size_t max_source_part_size = merger_mutator.getMaxSourcePartSizeForMutation(); + if (max_source_part_size < part->getBytesOnDisk()) + { + LOG_DEBUG(log, "Current max source part size for mutation is {} but part size {}. Will not mutate part {}. " + "Max size depends not only on available space, but also on settings " + "'number_of_free_entries_in_pool_to_execute_mutation' and 'background_pool_size'", + max_source_part_size, part->getBytesOnDisk(), part->name); + continue; + } + + size_t current_ast_elements = 0; + for (auto it = mutations_begin_it; it != mutations_end_it; ++it) + { + size_t commands_size = 0; + MutationCommands commands_for_size_validation; + for (const auto & command : it->second.commands) + { + if (command.type != MutationCommand::Type::DROP_COLUMN + && command.type != MutationCommand::Type::DROP_INDEX + && command.type != MutationCommand::Type::RENAME_COLUMN) + { + commands_for_size_validation.push_back(command); + } + else + { + commands_size += command.ast->size(); + } + } + + if (!commands_for_size_validation.empty()) + { + MutationsInterpreter interpreter( + shared_from_this(), metadata_snapshot, commands_for_size_validation, global_context, false); + commands_size += interpreter.evaluateCommandsSize(); + } + + if (current_ast_elements + commands_size >= max_ast_elements) + break; + + current_ast_elements += commands_size; + commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end()); + } + + auto new_part_info = part->info; + new_part_info.mutation = current_mutations_by_version.rbegin()->first; + + future_part.parts.push_back(part); + future_part.part_info = new_part_info; + future_part.name = part->getNewName(new_part_info); + future_part.type = part->getType(); + + tagger = std::make_unique(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, true); + auto table_id = getStorageID(); + MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part); + return MergeMutateSelectedEntry{future_part, std::move(tagger), std::move(merge_entry), commands}; + } + return {}; +} bool StorageMergeTree::tryMutatePart() { auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); StorageMetadataPtr metadata_snapshot = getInMemoryMetadataPtr(); - size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements; - FutureMergedMutatedPart future_part; - MutationCommands commands; - /// You must call destructor with unlocked `currently_processing_in_background_mutex`. - std::optional tagger; - { - std::lock_guard lock(currently_processing_in_background_mutex); - - if (current_mutations_by_version.empty()) - return false; - - auto mutations_end_it = current_mutations_by_version.end(); - for (const auto & part : getDataPartsVector()) - { - if (currently_merging_mutating_parts.count(part)) - continue; - - auto mutations_begin_it = current_mutations_by_version.upper_bound(part->info.getDataVersion()); - if (mutations_begin_it == mutations_end_it) - continue; - - size_t max_source_part_size = merger_mutator.getMaxSourcePartSizeForMutation(); - if (max_source_part_size < part->getBytesOnDisk()) - { - LOG_DEBUG(log, "Current max source part size for mutation is {} but part size {}. Will not mutate part {}. " - "Max size depends not only on available space, but also on settings " - "'number_of_free_entries_in_pool_to_execute_mutation' and 'background_pool_size'", - max_source_part_size, part->getBytesOnDisk(), part->name); - continue; - } - - size_t current_ast_elements = 0; - for (auto it = mutations_begin_it; it != mutations_end_it; ++it) - { - size_t commands_size = 0; - MutationCommands commands_for_size_validation; - for (const auto & command : it->second.commands) - { - if (command.type != MutationCommand::Type::DROP_COLUMN - && command.type != MutationCommand::Type::DROP_INDEX - && command.type != MutationCommand::Type::RENAME_COLUMN) - { - commands_for_size_validation.push_back(command); - } - else - { - commands_size += command.ast->size(); - } - } - - if (!commands_for_size_validation.empty()) - { - MutationsInterpreter interpreter( - shared_from_this(), metadata_snapshot, commands_for_size_validation, global_context, false); - commands_size += interpreter.evaluateCommandsSize(); - } - - if (current_ast_elements + commands_size >= max_ast_elements) - break; - - current_ast_elements += commands_size; - commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end()); - } - - auto new_part_info = part->info; - new_part_info.mutation = current_mutations_by_version.rbegin()->first; - - future_part.parts.push_back(part); - future_part.part_info = new_part_info; - future_part.name = part->getNewName(new_part_info); - future_part.type = part->getType(); - - tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true); - break; - } - } - - if (!tagger) + auto merge_mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr); + if (!merge_mutate_entry) return false; - auto table_id = getStorageID(); - MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part); - + auto & future_part = merge_mutate_entry->future_part; Stopwatch stopwatch; MutableDataPartPtr new_part; @@ -884,14 +894,14 @@ bool StorageMergeTree::tryMutatePart() future_part.name, new_part, future_part.parts, - merge_entry.get()); + merge_mutate_entry->merge_entry.get()); }; try { new_part = merger_mutator.mutatePartToTemporaryPart( - future_part, metadata_snapshot, commands, *merge_entry, - time(nullptr), global_context, tagger->reserved_space, table_lock_holder); + future_part, metadata_snapshot, merge_mutate_entry->commands, *(merge_mutate_entry->merge_entry), + time(nullptr), global_context, merge_mutate_entry->tagger->reserved_space, table_lock_holder); renameTempPartAndReplace(new_part); diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 5662f9e0088..9b740f575c2 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -21,6 +21,8 @@ namespace DB { +struct CurrentlyMergingPartsTagger; + /** See the description of the data structure in MergeTreeData. */ class StorageMergeTree final : public ext::shared_ptr_helper, public MergeTreeData @@ -140,6 +142,20 @@ private: /// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true. bool tryMutatePart(); + friend struct CurrentlyMergingPartsTagger; + + using CurrentlyMergingPartsTaggerPtr = std::unique_ptr; + + struct MergeMutateSelectedEntry + { + FutureMergedMutatedPart future_part; + CurrentlyMergingPartsTaggerPtr tagger; + MergeList::EntryPtr merge_entry; + MutationCommands commands; + }; + + std::optional selectPartsToMerge(const StorageMetadataPtr & metadata_snapshot, bool aggressive, const String & partition_id, bool final, String * disable_reason); + std::optional selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String * disable_reason); BackgroundProcessingPoolTaskResult mergeMutateTask(); @@ -173,7 +189,7 @@ private: friend class MergeTreeBlockOutputStream; friend class MergeTreeData; - friend struct CurrentlyMergingPartsTagger; + protected: diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 2ac8ddb7846..ae30977a9f2 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2537,16 +2537,8 @@ void StorageReplicatedMergeTree::mutationsUpdatingTask() } } - -BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask() +ReplicatedMergeTreeQueue::SelectedEntry StorageReplicatedMergeTree::selectQueueEntry() { - /// If replication queue is stopped exit immediately as we successfully executed the task - if (queue.actions_blocker.isCancelled()) - { - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - return BackgroundProcessingPoolTaskResult::SUCCESS; - } - /// This object will mark the element of the queue as running. ReplicatedMergeTreeQueue::SelectedEntry selected; @@ -2559,14 +2551,14 @@ BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask() tryLogCurrentException(log, __PRETTY_FUNCTION__); } - LogEntryPtr & entry = selected.first; + return selected; +} - if (!entry) - return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO; +bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntry & selected_entry) +{ - time_t prev_attempt_time = entry->last_attempt_time; - - bool res = queue.processEntry([this]{ return getZooKeeper(); }, entry, [&](LogEntryPtr & entry_to_process) + LogEntryPtr & entry = selected_entry.first; + return queue.processEntry([this]{ return getZooKeeper(); }, entry, [&](LogEntryPtr & entry_to_process) { try { @@ -2605,6 +2597,28 @@ BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask() throw; } }); +} + +BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask() +{ + /// If replication queue is stopped exit immediately as we successfully executed the task + if (queue.actions_blocker.isCancelled()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + return BackgroundProcessingPoolTaskResult::SUCCESS; + } + + /// This object will mark the element of the queue as running. + ReplicatedMergeTreeQueue::SelectedEntry selected_entry = selectQueueEntry(); + + LogEntryPtr & entry = selected_entry.first; + + if (!entry) + return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO; + + time_t prev_attempt_time = entry->last_attempt_time; + + bool res = processQueueEntry(selected_entry); /// We will go to sleep if the processing fails and if we have already processed this record recently. bool need_sleep = !res && (entry->last_attempt_time - prev_attempt_time < 10); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 0356cccd302..bb13cd7f230 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -416,6 +416,11 @@ private: /// Clone replica if it is lost. void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper); + + ReplicatedMergeTreeQueue::SelectedEntry selectQueueEntry(); + + bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntry & entry); + /** Performs actions from the queue. */ BackgroundProcessingPoolTaskResult queueTask();