Merge pull request #5659 from yandex/ignore-mutation-limits

Ignore query execution limits and limits for max parts size for merge while executing mutations.
This commit is contained in:
alexey-milovidov 2019-06-21 02:14:41 +03:00 committed by GitHub
commit 606b074a4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 59 additions and 34 deletions

View File

@ -1196,7 +1196,8 @@ void InterpreterSelectQuery::executeFetchColumns(
pipeline.transform([&](auto & stream)
{
stream->setLimits(limits);
if (!options.ignore_limits)
stream->setLimits(limits);
if (options.to_stage == QueryProcessingStage::Complete)
stream->setQuota(quota);

View File

@ -69,7 +69,7 @@ bool MutationsInterpreter::isStorageTouchedByMutations() const
context_copy.getSettingsRef().merge_tree_uniform_read_distribution = 0;
context_copy.getSettingsRef().max_threads = 1;
BlockInputStreamPtr in = InterpreterSelectQuery(select, context_copy, storage).execute().in;
BlockInputStreamPtr in = InterpreterSelectQuery(select, context_copy, storage, SelectQueryOptions().ignoreLimits()).execute().in;
Block block = in->read();
if (!block.rows())
@ -358,7 +358,7 @@ void MutationsInterpreter::prepare(bool dry_run)
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(where_expression));
}
interpreter_select = std::make_unique<InterpreterSelectQuery>(select, context, storage, SelectQueryOptions().analyze(dry_run));
interpreter_select = std::make_unique<InterpreterSelectQuery>(select, context, storage, SelectQueryOptions().analyze(dry_run).ignoreLimits());
is_prepared = true;
}

View File

@ -27,6 +27,7 @@ struct SelectQueryOptions
bool only_analyze;
bool modify_inplace;
bool remove_duplicates;
bool ignore_limits;
SelectQueryOptions(QueryProcessingStage::Enum stage = QueryProcessingStage::Complete, size_t depth = 0)
: to_stage(stage)
@ -34,6 +35,7 @@ struct SelectQueryOptions
, only_analyze(false)
, modify_inplace(false)
, remove_duplicates(false)
, ignore_limits(false)
{}
SelectQueryOptions copy() const { return *this; }
@ -71,6 +73,12 @@ struct SelectQueryOptions
subquery_depth = 0;
return *this;
}
SelectQueryOptions & ignoreLimits(bool value = true)
{
ignore_limits = value;
return *this;
}
};
}

View File

@ -126,16 +126,16 @@ MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, co
}
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize()
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge()
{
size_t total_threads_in_pool = pool.getNumberOfThreads();
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
return getMaxSourcePartsSize(total_threads_in_pool, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1); /// 1 is current thread
return getMaxSourcePartsSizeForMerge(total_threads_in_pool, busy_threads_in_pool == 0 ? 0 : busy_threads_in_pool - 1); /// 1 is current thread
}
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize(size_t pool_size, size_t pool_used)
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used)
{
if (pool_used > pool_size)
throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR);
@ -155,6 +155,12 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize(size_t pool_size, size_
}
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation()
{
return static_cast<UInt64>(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_RESERVE);
}
bool MergeTreeDataMergerMutator::selectPartsToMerge(
FutureMergedMutatedPart & future_part,
bool aggressive,

View File

@ -45,12 +45,17 @@ public:
/** Get maximum total size of parts to do merge, at current moment of time.
* It depends on number of free threads in background_pool and amount of free space in disk.
*/
UInt64 getMaxSourcePartsSize();
UInt64 getMaxSourcePartsSizeForMerge();
/** For explicitly passed size of pool and number of used tasks.
* This method could be used to calculate threshold depending on number of tasks in replication queue.
*/
UInt64 getMaxSourcePartsSize(size_t pool_size, size_t pool_used);
UInt64 getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used);
/** Get maximum total size of parts to do mutation, at current moment of time.
* It depends only on amount of free space in disk.
*/
UInt64 getMaxSourcePartSizeForMutation();
/** Selects which parts to merge. Uses a lot of heuristics.
*

View File

@ -960,7 +960,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
* But if all threads are free (maximal size of merge is allowed) then execute any merge,
* (because it may be ordered by OPTIMIZE or early with differrent settings).
*/
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSize();
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
if (max_source_parts_size != data.settings.max_bytes_to_merge_at_max_space_in_pool
&& sum_parts_size_in_bytes > max_source_parts_size)
{

View File

@ -513,7 +513,7 @@ bool StorageMergeTree::merge(
if (partition_id.empty())
{
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSize();
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
if (max_source_parts_size > 0)
selected = merger_mutator.selectPartsToMerge(future_part, aggressive, max_source_parts_size, can_merge, out_disable_reason);
else if (out_disable_reason)

View File

@ -2181,35 +2181,33 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
}
else
{
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSize(
UInt64 max_source_parts_size_for_merge = merger_mutator.getMaxSourcePartsSizeForMerge(
settings.max_replicated_merges_in_queue, merges_and_mutations_queued);
UInt64 max_source_part_size_for_mutation = merger_mutator.getMaxSourcePartSizeForMutation();
if (max_source_parts_size > 0)
FutureMergedMutatedPart future_merged_part;
if (max_source_parts_size_for_merge > 0 &&
merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred))
{
FutureMergedMutatedPart future_merged_part;
if (merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size, merge_pred))
success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate);
}
else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0)
{
/// Choose a part to mutate.
DataPartsVector data_parts = getDataPartsVector();
for (const auto & part : data_parts)
{
success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate);
}
else if (queue.countMutations() > 0)
{
/// Choose a part to mutate.
if (part->bytes_on_disk > max_source_part_size_for_mutation)
continue;
DataPartsVector data_parts = getDataPartsVector();
for (const auto & part : data_parts)
std::optional<Int64> desired_mutation_version = merge_pred.getDesiredMutationVersion(part);
if (!desired_mutation_version)
continue;
if (createLogEntryToMutatePart(*part, *desired_mutation_version))
{
if (part->bytes_on_disk > max_source_parts_size)
continue;
std::optional<Int64> desired_mutation_version = merge_pred.getDesiredMutationVersion(part);
if (!desired_mutation_version)
continue;
if (createLogEntryToMutatePart(*part, *desired_mutation_version))
{
success = true;
break;
}
success = true;
break;
}
}
}

View File

@ -0,0 +1,6 @@
<yandex>
<merge_tree>
<max_bytes_to_merge_at_min_space_in_pool>1</max_bytes_to_merge_at_min_space_in_pool>
<max_bytes_to_merge_at_max_space_in_pool>2</max_bytes_to_merge_at_max_space_in_pool>
</merge_tree>
</yandex>

View File

@ -11,7 +11,8 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True)
node2 = cluster.add_instance('node2', with_zookeeper=True)
# Check, that limits on max part size for merges doesn`t affect mutations
node2 = cluster.add_instance('node2', main_configs=["configs/merge_tree.xml"], with_zookeeper=True)
nodes = [node1, node2]
@pytest.fixture(scope="module")