diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index bd131164880..bab04df70ab 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -1196,7 +1196,8 @@ void InterpreterSelectQuery::executeFetchColumns( pipeline.transform([&](auto & stream) { - stream->setLimits(limits); + if (!options.ignore_limits) + stream->setLimits(limits); if (options.to_stage == QueryProcessingStage::Complete) stream->setQuota(quota); diff --git a/dbms/src/Interpreters/MutationsInterpreter.cpp b/dbms/src/Interpreters/MutationsInterpreter.cpp index 54d7c49a7ae..69339f66712 100644 --- a/dbms/src/Interpreters/MutationsInterpreter.cpp +++ b/dbms/src/Interpreters/MutationsInterpreter.cpp @@ -69,7 +69,7 @@ bool MutationsInterpreter::isStorageTouchedByMutations() const context_copy.getSettingsRef().merge_tree_uniform_read_distribution = 0; context_copy.getSettingsRef().max_threads = 1; - BlockInputStreamPtr in = InterpreterSelectQuery(select, context_copy, storage).execute().in; + BlockInputStreamPtr in = InterpreterSelectQuery(select, context_copy, storage, SelectQueryOptions().ignoreLimits()).execute().in; Block block = in->read(); if (!block.rows()) @@ -358,7 +358,7 @@ void MutationsInterpreter::prepare(bool dry_run) select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression)); } - interpreter_select = std::make_unique(select, context, storage, SelectQueryOptions().analyze(dry_run)); + interpreter_select = std::make_unique(select, context, storage, SelectQueryOptions().analyze(dry_run).ignoreLimits()); is_prepared = true; } diff --git a/dbms/src/Interpreters/SelectQueryOptions.h b/dbms/src/Interpreters/SelectQueryOptions.h index 0cf5827be3c..4fd94a830b8 100644 --- a/dbms/src/Interpreters/SelectQueryOptions.h +++ b/dbms/src/Interpreters/SelectQueryOptions.h @@ -27,6 +27,7 @@ struct SelectQueryOptions bool only_analyze; bool modify_inplace; bool remove_duplicates; + bool ignore_limits; SelectQueryOptions(QueryProcessingStage::Enum stage = QueryProcessingStage::Complete, size_t depth = 0) : to_stage(stage) @@ -34,6 +35,7 @@ struct SelectQueryOptions , only_analyze(false) , modify_inplace(false) , remove_duplicates(false) + , ignore_limits(false) {} SelectQueryOptions copy() const { return *this; } @@ -71,6 +73,12 @@ struct SelectQueryOptions subquery_depth = 0; return *this; } + + SelectQueryOptions & ignoreLimits(bool value = true) + { + ignore_limits = value; + return *this; + } }; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 0eec2e88e26..705c1ed3525 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -126,16 +126,16 @@ MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, co } -UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize() +UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge() { size_t total_threads_in_pool = pool.getNumberOfThreads(); size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed); - return getMaxSourcePartsSize(total_threads_in_pool, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1); /// 1 is current thread + return getMaxSourcePartsSizeForMerge(total_threads_in_pool, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1); /// 1 is current thread } -UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize(size_t pool_size, size_t pool_used) +UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used) { if (pool_used > pool_size) throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR); @@ -155,6 +155,12 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize(size_t pool_size, size_ } +UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation() +{ + return static_cast(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_RESERVE); +} + + bool MergeTreeDataMergerMutator::selectPartsToMerge( FutureMergedMutatedPart & future_part, bool aggressive, diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 4c300965590..29fd615d39b 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -45,12 +45,17 @@ public: /** Get maximum total size of parts to do merge, at current moment of time. * It depends on number of free threads in background_pool and amount of free space in disk. */ - UInt64 getMaxSourcePartsSize(); + UInt64 getMaxSourcePartsSizeForMerge(); /** For explicitly passed size of pool and number of used tasks. * This method could be used to calculate threshold depending on number of tasks in replication queue. */ - UInt64 getMaxSourcePartsSize(size_t pool_size, size_t pool_used); + UInt64 getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used); + + /** Get maximum total size of parts to do mutation, at current moment of time. + * It depends only on amount of free space in disk. + */ + UInt64 getMaxSourcePartSizeForMutation(); /** Selects which parts to merge. Uses a lot of heuristics. * diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index a04958c32a5..6ce01f7b500 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -960,7 +960,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( * But if all threads are free (maximal size of merge is allowed) then execute any merge, * (because it may be ordered by OPTIMIZE or early with differrent settings). */ - UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSize(); + UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); if (max_source_parts_size != data.settings.max_bytes_to_merge_at_max_space_in_pool && sum_parts_size_in_bytes > max_source_parts_size) { diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 575aee9b497..87526e3f39d 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -513,7 +513,7 @@ bool StorageMergeTree::merge( if (partition_id.empty()) { - UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSize(); + UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); if (max_source_parts_size > 0) selected = merger_mutator.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, out_disable_reason); else if (out_disable_reason) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index f2ba47ba4ce..c61edc285f4 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2181,35 +2181,33 @@ void StorageReplicatedMergeTree::mergeSelectingTask() } else { - UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSize( + UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge( settings.max_replicated_merges_in_queue, merges_and_mutations_queued); + UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation(); - if (max_source_parts_size > 0) + FutureMergedMutatedPart future_merged_part; + if (max_source_parts_size_for_merge > 0 && + merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred)) { - FutureMergedMutatedPart future_merged_part; - if (merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size, merge_pred)) + success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate); + } + else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0) + { + /// Choose a part to mutate. + DataPartsVector data_parts = getDataPartsVector(); + for (const auto & part : data_parts) { - success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate); - } - else if (queue.countMutations() > 0) - { - /// Choose a part to mutate. + if (part->bytes_on_disk > max_source_part_size_for_mutation) + continue; - DataPartsVector data_parts = getDataPartsVector(); - for (const auto & part : data_parts) + std::optional desired_mutation_version = merge_pred.getDesiredMutationVersion(part); + if (!desired_mutation_version) + continue; + + if (createLogEntryToMutatePart(*part, *desired_mutation_version)) { - if (part->bytes_on_disk > max_source_parts_size) - continue; - - std::optional desired_mutation_version = merge_pred.getDesiredMutationVersion(part); - if (!desired_mutation_version) - continue; - - if (createLogEntryToMutatePart(*part, *desired_mutation_version)) - { - success = true; - break; - } + success = true; + break; } } } diff --git a/dbms/tests/integration/test_replicated_mutations/configs/merge_tree.xml b/dbms/tests/integration/test_replicated_mutations/configs/merge_tree.xml new file mode 100644 index 00000000000..6b08fbc5087 --- /dev/null +++ b/dbms/tests/integration/test_replicated_mutations/configs/merge_tree.xml @@ -0,0 +1,6 @@ + + + 1 + 2 + + \ No newline at end of file diff --git a/dbms/tests/integration/test_replicated_mutations/test.py b/dbms/tests/integration/test_replicated_mutations/test.py index 2d142a9e842..351ceff3608 100644 --- a/dbms/tests/integration/test_replicated_mutations/test.py +++ b/dbms/tests/integration/test_replicated_mutations/test.py @@ -11,7 +11,8 @@ from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance('node1', with_zookeeper=True) -node2 = cluster.add_instance('node2', with_zookeeper=True) +# Check, that limits on max part size for merges doesn`t affect mutations +node2 = cluster.add_instance('node2', main_configs=["configs/merge_tree.xml"], with_zookeeper=True) nodes = [node1, node2] @pytest.fixture(scope="module")