From 6d9cbc6f284b82648e7261f4897738e0fd961d9a Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Fri, 14 Jun 2019 22:27:53 +0300 Subject: [PATCH 1/3] ignore query limits from system profile while executing mutations --- dbms/src/Interpreters/InterpreterSelectQuery.cpp | 3 ++- dbms/src/Interpreters/MutationsInterpreter.cpp | 4 ++-- dbms/src/Interpreters/SelectQueryOptions.h | 7 +++++++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 5560b8512e0..6d867d6c4ec 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -1196,7 +1196,8 @@ void InterpreterSelectQuery::executeFetchColumns( pipeline.transform([&](auto & stream) { - stream->setLimits(limits); + if (!options.ignore_limits) + stream->setLimits(limits); if (options.to_stage == QueryProcessingStage::Complete) stream->setQuota(quota); diff --git a/dbms/src/Interpreters/MutationsInterpreter.cpp b/dbms/src/Interpreters/MutationsInterpreter.cpp index 5c594d685a8..3e63d44b090 100644 --- a/dbms/src/Interpreters/MutationsInterpreter.cpp +++ b/dbms/src/Interpreters/MutationsInterpreter.cpp @@ -69,7 +69,7 @@ bool MutationsInterpreter::isStorageTouchedByMutations() const context_copy.getSettingsRef().merge_tree_uniform_read_distribution = 0; context_copy.getSettingsRef().max_threads = 1; - BlockInputStreamPtr in = InterpreterSelectQuery(select, context_copy, storage).execute().in; + BlockInputStreamPtr in = InterpreterSelectQuery(select, context_copy, storage, SelectQueryOptions().ignoreLimits()).execute().in; Block block = in->read(); if (!block.rows()) @@ -358,7 +358,7 @@ void MutationsInterpreter::prepare(bool dry_run) select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression)); } - interpreter_select = std::make_unique(select, context, storage, SelectQueryOptions().analyze(dry_run)); + interpreter_select = std::make_unique(select, context, storage, SelectQueryOptions().analyze(dry_run).ignoreLimits()); is_prepared = true; } diff --git a/dbms/src/Interpreters/SelectQueryOptions.h b/dbms/src/Interpreters/SelectQueryOptions.h index 0cf5827be3c..cb2161488b0 100644 --- a/dbms/src/Interpreters/SelectQueryOptions.h +++ b/dbms/src/Interpreters/SelectQueryOptions.h @@ -27,6 +27,7 @@ struct SelectQueryOptions bool only_analyze; bool modify_inplace; bool remove_duplicates; + bool ignore_limits; SelectQueryOptions(QueryProcessingStage::Enum stage = QueryProcessingStage::Complete, size_t depth = 0) : to_stage(stage) @@ -71,6 +72,12 @@ struct SelectQueryOptions subquery_depth = 0; return *this; } + + SelectQueryOptions & ignoreLimits(bool value = true) + { + ignore_limits = value; + return *this; + } }; } From e6dd92f9e1c12847f633c0216097b58d867da022 Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Mon, 17 Jun 2019 22:41:48 +0300 Subject: [PATCH 2/3] ignore max part size for merge setting while executing mutations --- .../MergeTree/MergeTreeDataMergerMutator.cpp | 12 +++-- .../MergeTree/MergeTreeDataMergerMutator.h | 9 +++- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 2 +- dbms/src/Storages/StorageMergeTree.cpp | 2 +- .../Storages/StorageReplicatedMergeTree.cpp | 44 +++++++++---------- .../configs/merge_tree.xml | 6 +++ .../test_replicated_mutations/test.py | 3 +- 7 files changed, 47 insertions(+), 31 deletions(-) create mode 100644 dbms/tests/integration/test_replicated_mutations/configs/merge_tree.xml diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 9ec234de4e4..a81cb13d8be 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -125,16 +125,16 @@ MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, co } -UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize() +UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge() { size_t total_threads_in_pool = pool.getNumberOfThreads(); size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed); - return getMaxSourcePartsSize(total_threads_in_pool, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1); /// 1 is current thread + return getMaxSourcePartsSizeForMerge(total_threads_in_pool, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1); /// 1 is current thread } -UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize(size_t pool_size, size_t pool_used) +UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used) { if (pool_used > pool_size) throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR); @@ -154,6 +154,12 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize(size_t pool_size, size_ } +UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation() +{ + return static_cast(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_RESERVE); +} + + bool MergeTreeDataMergerMutator::selectPartsToMerge( FutureMergedMutatedPart & future_part, bool aggressive, diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 4c300965590..29fd615d39b 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -45,12 +45,17 @@ public: /** Get maximum total size of parts to do merge, at current moment of time. * It depends on number of free threads in background_pool and amount of free space in disk. */ - UInt64 getMaxSourcePartsSize(); + UInt64 getMaxSourcePartsSizeForMerge(); /** For explicitly passed size of pool and number of used tasks. * This method could be used to calculate threshold depending on number of tasks in replication queue. */ - UInt64 getMaxSourcePartsSize(size_t pool_size, size_t pool_used); + UInt64 getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used); + + /** Get maximum total size of parts to do mutation, at current moment of time. + * It depends only on amount of free space in disk. + */ + UInt64 getMaxSourcePartSizeForMutation(); /** Selects which parts to merge. Uses a lot of heuristics. * diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index a04958c32a5..6ce01f7b500 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -960,7 +960,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( * But if all threads are free (maximal size of merge is allowed) then execute any merge, * (because it may be ordered by OPTIMIZE or early with differrent settings). */ - UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSize(); + UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); if (max_source_parts_size != data.settings.max_bytes_to_merge_at_max_space_in_pool && sum_parts_size_in_bytes > max_source_parts_size) { diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 575aee9b497..87526e3f39d 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -513,7 +513,7 @@ bool StorageMergeTree::merge( if (partition_id.empty()) { - UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSize(); + UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge(); if (max_source_parts_size > 0) selected = merger_mutator.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, out_disable_reason); else if (out_disable_reason) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index d4dd0e18193..e445e78560e 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2181,35 +2181,33 @@ void StorageReplicatedMergeTree::mergeSelectingTask() } else { - UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSize( + UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge( settings.max_replicated_merges_in_queue, merges_and_mutations_queued); + UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation(); - if (max_source_parts_size > 0) + FutureMergedMutatedPart future_merged_part; + if (max_source_parts_size_for_merge > 0 && + merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred)) { - FutureMergedMutatedPart future_merged_part; - if (merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size, merge_pred)) + success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate); + } + else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0) + { + /// Choose a part to mutate. + DataPartsVector data_parts = getDataPartsVector(); + for (const auto & part : data_parts) { - success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate); - } - else if (queue.countMutations() > 0) - { - /// Choose a part to mutate. + if (part->bytes_on_disk > max_source_part_size_for_mutation) + continue; - DataPartsVector data_parts = getDataPartsVector(); - for (const auto & part : data_parts) + std::optional desired_mutation_version = merge_pred.getDesiredMutationVersion(part); + if (!desired_mutation_version) + continue; + + if (createLogEntryToMutatePart(*part, *desired_mutation_version)) { - if (part->bytes_on_disk > max_source_parts_size) - continue; - - std::optional desired_mutation_version = merge_pred.getDesiredMutationVersion(part); - if (!desired_mutation_version) - continue; - - if (createLogEntryToMutatePart(*part, *desired_mutation_version)) - { - success = true; - break; - } + success = true; + break; } } } diff --git a/dbms/tests/integration/test_replicated_mutations/configs/merge_tree.xml b/dbms/tests/integration/test_replicated_mutations/configs/merge_tree.xml new file mode 100644 index 00000000000..6b08fbc5087 --- /dev/null +++ b/dbms/tests/integration/test_replicated_mutations/configs/merge_tree.xml @@ -0,0 +1,6 @@ + + + 1 + 2 + + \ No newline at end of file diff --git a/dbms/tests/integration/test_replicated_mutations/test.py b/dbms/tests/integration/test_replicated_mutations/test.py index 2d142a9e842..351ceff3608 100644 --- a/dbms/tests/integration/test_replicated_mutations/test.py +++ b/dbms/tests/integration/test_replicated_mutations/test.py @@ -11,7 +11,8 @@ from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance('node1', with_zookeeper=True) -node2 = cluster.add_instance('node2', with_zookeeper=True) +# Check, that limits on max part size for merges doesn`t affect mutations +node2 = cluster.add_instance('node2', main_configs=["configs/merge_tree.xml"], with_zookeeper=True) nodes = [node1, node2] @pytest.fixture(scope="module") From 2a04125360a46e155ebbae636e9bcf03cc7e46c0 Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Tue, 18 Jun 2019 01:07:16 +0300 Subject: [PATCH 3/3] fix ignore_limits option --- dbms/src/Interpreters/SelectQueryOptions.h | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Interpreters/SelectQueryOptions.h b/dbms/src/Interpreters/SelectQueryOptions.h index cb2161488b0..4fd94a830b8 100644 --- a/dbms/src/Interpreters/SelectQueryOptions.h +++ b/dbms/src/Interpreters/SelectQueryOptions.h @@ -35,6 +35,7 @@ struct SelectQueryOptions , only_analyze(false) , modify_inplace(false) , remove_duplicates(false) + , ignore_limits(false) {} SelectQueryOptions copy() const { return *this; }