diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 1a5c94a2e26..31fd2919a5d 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -333,6 +333,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( SimpleMergeSelector::Settings merge_settings; /// Override value from table settings merge_settings.max_parts_to_merge_at_once = data_settings->max_parts_to_merge_at_once; + merge_settings.min_age_to_force_merge = data_settings->min_age_to_force_merge_seconds; if (aggressive) merge_settings.base = 1; @@ -350,6 +351,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge( *out_disable_reason = "There is no need to merge parts according to merge selector algorithm"; return SelectPartsDecision::CANNOT_SELECT; } + } MergeTreeData::DataPartsVector parts; diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 63421c3de5d..2f2eda42970 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -62,8 +62,7 @@ struct Settings; M(UInt64, merge_tree_clear_old_temporary_directories_interval_seconds, 60, "The period of executing the clear old temporary directories operation in background.", 0) \ M(UInt64, merge_tree_clear_old_parts_interval_seconds, 1, "The period of executing the clear old parts operation in background.", 0) \ M(UInt64, merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds, 1ULL * 3600 * 24 * 30, "Remove old broken detached parts in the background if they remained intouched for a specified by this setting period of time.", 0) \ - M(UInt64, auto_optimize_partition_interval_seconds, 0, "The period of executing the auto optimize partitions in background. Set to 0 to disable.", 0) \ - M(UInt64, auto_optimize_partition_after_seconds, 0, "The number of seconds since last mutation required for partition to be automatically optimized. Set to 0 to disable.", 0) \ + M(UInt64, min_age_to_force_merge_seconds, 0, "If all parts in a certain range are older than this value, range will be always eligable for merging. Set to 0 to disable.", 0) \ M(UInt64, merge_tree_enable_clear_old_broken_detached, false, "Enable clearing old broken detached parts operation in background.", 0) \ M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \ \ diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 0d021e39a71..e2b23d75746 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -155,8 +155,6 @@ bool ReplicatedMergeTreeRestartingThread::runImpl() storage.mutations_updating_task->activateAndSchedule(); storage.mutations_finalizing_task->activateAndSchedule(); storage.merge_selecting_task->activateAndSchedule(); - if (storage.auto_optimize_partition_task) - storage.auto_optimize_partition_task->activateAndSchedule(); storage.cleanup_thread.start(); storage.part_check_thread.start(); diff --git a/src/Storages/MergeTree/SimpleMergeSelector.cpp b/src/Storages/MergeTree/SimpleMergeSelector.cpp index 3b71e2720c8..f9ed6aedc60 100644 --- a/src/Storages/MergeTree/SimpleMergeSelector.cpp +++ b/src/Storages/MergeTree/SimpleMergeSelector.cpp @@ -102,6 +102,9 @@ bool allow( double max_size_to_lower_base_log, const SimpleMergeSelector::Settings & settings) { + if (settings.min_age_to_force_merge && min_age >= settings.min_age_to_force_merge) + return true; + // std::cerr << "sum_size: " << sum_size << "\n"; /// Map size to 0..1 using logarithmic scale diff --git a/src/Storages/MergeTree/SimpleMergeSelector.h b/src/Storages/MergeTree/SimpleMergeSelector.h index 11ffe8b672a..1480afaf1d2 100644 --- a/src/Storages/MergeTree/SimpleMergeSelector.h +++ b/src/Storages/MergeTree/SimpleMergeSelector.h @@ -141,6 +141,11 @@ public: double heuristic_to_align_parts_max_absolute_difference_in_powers_of_two = 0.5; double heuristic_to_align_parts_max_score_adjustment = 0.75; + /// If it's not 0, all part ranges that have min_age larger than min_age_to_force_merge + /// will be considered for merging + size_t min_age_to_force_merge = 0; + + /** Heuristic: * From right side of range, remove all parts, that size is less than specified ratio of sum_size. */ diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 1af0e6e756f..a65af1cf69e 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -798,54 +798,6 @@ void StorageMergeTree::loadMutations() increment.value = std::max(increment.value.load(), current_mutations_by_version.rbegin()->first); } -std::shared_ptr StorageMergeTree::selectOnePartitionToOptimize( - const StorageMetadataPtr & metadata_snapshot, - TableLockHolder & table_lock_holder, - std::unique_lock & lock, - const MergeTreeTransactionPtr & txn, - bool optimize_skip_merged_partitions) -{ - // Select the `best partition to merge. - std::unordered_map partition_parts_sum_diff; - ssize_t base = time(nullptr) - getSettings()->auto_optimize_partition_after_seconds; - auto data_parts = getDataPartsForInternalUsage(); - for (const auto & part : data_parts) - { - if (part->modification_time < base) - partition_parts_sum_diff[part->info.partition_id] += (base - part->modification_time); - } - auto best_partition_it = std::max_element(partition_parts_sum_diff.begin(), partition_parts_sum_diff.end(), [](const auto & e1, const auto & e2) { return e1.second < e2.second; }); - if (best_partition_it == partition_parts_sum_diff.end()) - { - return nullptr; - } - // Merge the selected partition. - String disable_reason; - SelectPartsDecision select_decision; - auto merge_entry = selectPartsToMerge( - metadata_snapshot, - true, - best_partition_it->first, - true, - &disable_reason, - table_lock_holder, - lock, - txn, - optimize_skip_merged_partitions, - &select_decision); - - if (select_decision == SelectPartsDecision::NOTHING_TO_MERGE) - return nullptr; - if (!merge_entry) - { - static constexpr const char * message = "Cannot OPTIMIZE table in background: {}"; - if (disable_reason.empty()) - disable_reason = "unknown reason"; - LOG_INFO(log, message, disable_reason); - } - return merge_entry; -} - MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge( const StorageMetadataPtr & metadata_snapshot, bool aggressive, @@ -1203,12 +1155,6 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign if (!merge_entry) mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, share_lock, lock); - if (getSettings()->auto_optimize_partition_interval_seconds - && !merge_entry && !mutate_entry - && time_after_previous_optimize_one_partition.compareAndRestartDeferred(getSettings()->auto_optimize_partition_interval_seconds)) - { - merge_entry = selectOnePartitionToOptimize(metadata_snapshot, share_lock, lock, txn); - } has_mutations = !current_mutations_by_version.empty(); } diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 5d7a24a2f7e..ea2527e44a7 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -135,8 +135,6 @@ private: AtomicStopwatch time_after_previous_cleanup_temporary_directories; /// For clearOldBrokenDetachedParts AtomicStopwatch time_after_previous_cleanup_broken_detached_parts; - /// For optimizeOnePartition; - AtomicStopwatch time_after_previous_optimize_one_partition; /// Mutex for parts currently processing in background /// merging (also with TTL), mutating or moving. @@ -171,15 +169,6 @@ private: String * out_disable_reason = nullptr, bool optimize_skip_merged_partitions = false); - /** Determines what part within one partition should be merged. - */ - std::shared_ptr selectOnePartitionToOptimize( - const StorageMetadataPtr & metadata_snapshot, - TableLockHolder & table_lock_holder, - std::unique_lock & lock, - const MergeTreeTransactionPtr & txn, - bool optimize_skip_merged_partitions = false); - /// Make part state outdated and queue it to remove without timeout /// If force, then stop merges and block them until part state became outdated. Throw exception if part doesn't exists /// If not force, then take merges selector and check that part is not participating in background operations. diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 186843d3bd6..5818ff37c5b 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -293,14 +293,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( merge_selecting_task = getContext()->getSchedulePool().createTask( getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mergeSelectingTask)", [this] { mergeSelectingTask(); }); - if (getSettings()->auto_optimize_partition_interval_seconds) - auto_optimize_partition_task = getContext()->getSchedulePool().createTask( - getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::autoOptimizePartitionTask)", [this] { autoOptimizePartitionTask(); }); /// Will be activated if we win leader election. merge_selecting_task->deactivate(); - if (auto_optimize_partition_task) - auto_optimize_partition_task->deactivate(); mutations_finalizing_task = getContext()->getSchedulePool().createTask( getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mutationsFinalizingTask)", [this] { mutationsFinalizingTask(); }); @@ -4457,31 +4452,9 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con local_context); } -void StorageReplicatedMergeTree::autoOptimizePartitionTask() -{ - if (!is_leader) - return; - - auto table_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); - try - { - optimizeImpl(nullptr, nullptr, nullptr, true, true, Names{}, table_lock, getContext(), true); - } - catch (const Exception & e) - { - LOG_ERROR(log, "Can't optimize partitions automatically for table: {}, reason: {}", getStorageID().getNameForLogs(), e.displayText()); - } - catch (...) - { - LOG_ERROR(log, "There is a problem when optimizing table: {}, reason: {}", getStorageID().getNameForLogs(), getCurrentExceptionMessage(true)); - } - if (auto_optimize_partition_task) - auto_optimize_partition_task->scheduleAfter(getSettings()->auto_optimize_partition_interval_seconds); -} - bool StorageReplicatedMergeTree::optimize( - const ASTPtr & query, - const StorageMetadataPtr & metadata_snapshot, + const ASTPtr &, + const StorageMetadataPtr &, const ASTPtr & partition, bool final, bool deduplicate, @@ -4491,20 +4464,7 @@ bool StorageReplicatedMergeTree::optimize( /// NOTE: exclusive lock cannot be used here, since this may lead to deadlock (see comments below), /// but it should be safe to use non-exclusive to avoid dropping parts that may be required for processing queue. auto table_lock = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout); - return optimizeImpl(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, table_lock, query_context, false); -} -bool StorageReplicatedMergeTree::optimizeImpl( - const ASTPtr &, - const StorageMetadataPtr &, - const ASTPtr & partition, - bool final, - bool deduplicate, - const Names & deduplicate_by_columns, - TableLockHolder & table_lock, - ContextPtr query_context, - bool auto_optimize_in_background) -{ assertNotReadonly(); if (!is_leader) @@ -4602,12 +4562,8 @@ bool StorageReplicatedMergeTree::optimizeImpl( DataPartsVector data_parts = getVisibleDataPartsVector(query_context); std::unordered_set partition_ids; - ssize_t baseline = time(nullptr) - storage_settings_ptr->auto_optimize_partition_after_seconds; for (const DataPartPtr & part : data_parts) - { - if (!auto_optimize_in_background || part->modification_time < baseline) - partition_ids.emplace(part->info.partition_id); - } + partition_ids.emplace(part->info.partition_id); for (const String & partition_id : partition_ids) { @@ -4626,12 +4582,6 @@ bool StorageReplicatedMergeTree::optimizeImpl( table_lock.reset(); - if (!auto_optimize_in_background) - { - for (auto & merge_entry : merge_entries) - waitForLogEntryToBeProcessedIfNecessary(merge_entry, query_context); - } - return assigned; } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 1d315f42ad0..b88b9497e39 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -331,16 +331,7 @@ public: bool canUseZeroCopyReplication() const; private: std::atomic_bool are_restoring_replica {false}; - bool optimizeImpl( - const ASTPtr &, - const StorageMetadataPtr &, - const ASTPtr & partition, - bool final, - bool deduplicate, - const Names & deduplicate_by_columns, - TableLockHolder & table_lock, - ContextPtr query_context, - bool auto_optimize_in_background); + /// Get a sequential consistent view of current parts. ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const; @@ -455,9 +446,6 @@ private: /// A task that marks finished mutations as done. BackgroundSchedulePool::TaskHolder mutations_finalizing_task; - /// A task that optimize partitions automatically background if enabled. - BackgroundSchedulePool::TaskHolder auto_optimize_partition_task { nullptr }; - /// A thread that removes old parts, log entries, and blocks. ReplicatedMergeTreeCleanupThread cleanup_thread; diff --git a/tests/integration/test_auto_optimize_partitions/configs/merge_tree.xml b/tests/integration/test_auto_optimize_partitions/configs/merge_tree.xml deleted file mode 100644 index 9c3da3c99ac..00000000000 --- a/tests/integration/test_auto_optimize_partitions/configs/merge_tree.xml +++ /dev/null @@ -1,5 +0,0 @@ - - - 2 - - diff --git a/tests/integration/test_auto_optimize_partitions/__init__.py b/tests/integration/test_merge_tree_optimize_old_parts/__init__.py similarity index 100% rename from tests/integration/test_auto_optimize_partitions/__init__.py rename to tests/integration/test_merge_tree_optimize_old_parts/__init__.py diff --git a/tests/integration/test_merge_tree_optimize_old_parts/configs/merge_tree.xml b/tests/integration/test_merge_tree_optimize_old_parts/configs/merge_tree.xml new file mode 100644 index 00000000000..07c96a8e41a --- /dev/null +++ b/tests/integration/test_merge_tree_optimize_old_parts/configs/merge_tree.xml @@ -0,0 +1,5 @@ + + + 2 + + diff --git a/tests/integration/test_auto_optimize_partitions/configs/zookeeper_config.xml b/tests/integration/test_merge_tree_optimize_old_parts/configs/zookeeper_config.xml similarity index 100% rename from tests/integration/test_auto_optimize_partitions/configs/zookeeper_config.xml rename to tests/integration/test_merge_tree_optimize_old_parts/configs/zookeeper_config.xml diff --git a/tests/integration/test_auto_optimize_partitions/test.py b/tests/integration/test_merge_tree_optimize_old_parts/test.py similarity index 82% rename from tests/integration/test_auto_optimize_partitions/test.py rename to tests/integration/test_merge_tree_optimize_old_parts/test.py index 77ce56e3787..e78ecbeed60 100644 --- a/tests/integration/test_auto_optimize_partitions/test.py +++ b/tests/integration/test_merge_tree_optimize_old_parts/test.py @@ -38,21 +38,21 @@ def check_expected_result_or_fail(seconds, expected): assert ok -def test_without_auto_optimize_merge_tree(start_cluster): +def test_without_force_merge_old_parts(start_cluster): node.query("CREATE TABLE test (i Int64) ENGINE = MergeTree ORDER BY i;") node.query("INSERT INTO test SELECT 1") node.query("INSERT INTO test SELECT 2") node.query("INSERT INTO test SELECT 3") expected = TSV("""3\n""") - check_expected_result_or_fail(5, expected) + check_expected_result_or_fail(10, expected) node.query("DROP TABLE test;") -def test_auto_optimize_merge_tree(start_cluster): +def test_force_merge_old_parts(start_cluster): node.query( - "CREATE TABLE test (i Int64) ENGINE = MergeTree ORDER BY i SETTINGS auto_optimize_partition_interval_seconds=5;" + "CREATE TABLE test (i Int64) ENGINE = MergeTree ORDER BY i SETTINGS min_age_to_force_merge_seconds=5;" ) node.query("INSERT INTO test SELECT 1") node.query("INSERT INTO test SELECT 2") @@ -64,9 +64,9 @@ def test_auto_optimize_merge_tree(start_cluster): node.query("DROP TABLE test;") -def test_auto_optimize_replicated_merge_tree(start_cluster): +def test_force_merge_old_parts_replicated_merge_tree(start_cluster): node.query( - "CREATE TABLE test (i Int64) ENGINE = ReplicatedMergeTree('/clickhouse/testing/test', 'node') ORDER BY i SETTINGS auto_optimize_partition_interval_seconds=5;" + "CREATE TABLE test (i Int64) ENGINE = ReplicatedMergeTree('/clickhouse/testing/test', 'node') ORDER BY i SETTINGS min_age_to_force_merge_seconds=5;" ) node.query("INSERT INTO test SELECT 1") node.query("INSERT INTO test SELECT 2")