Removed sub-optimal mutation notifications in StorageMergeTree when merges are still going (#27552)

* Removed redundand mutation notifications in `StorageMergeTree` when merges are still going.
This commit is contained in:
Vladimir Chebotarev 2021-09-17 16:19:36 +03:00 committed by GitHub
parent 14e4d49601
commit db516e8c91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1019,7 +1019,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
bool has_mutations; bool has_mutations = false;
{ {
std::unique_lock lock(currently_processing_in_background_mutex); std::unique_lock lock(currently_processing_in_background_mutex);
if (merger_mutator.merges_blocker.isCancelled()) if (merger_mutator.merges_blocker.isCancelled())
@ -1027,16 +1027,10 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr, share_lock, lock); merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr, share_lock, lock);
if (!merge_entry) if (!merge_entry)
{
mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, share_lock); mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, share_lock);
has_mutations = !current_mutations_by_version.empty();
has_mutations = !current_mutations_by_version.empty(); }
}
if (!mutate_entry && has_mutations)
{
/// Notify in case of errors
std::lock_guard lock(mutation_wait_mutex);
mutation_wait_event.notify_all();
} }
if (merge_entry) if (merge_entry)
@ -1051,6 +1045,14 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
assignee.scheduleMergeMutateTask(task); assignee.scheduleMergeMutateTask(task);
return true; return true;
} }
if (has_mutations)
{
/// Notify in case of errors if no mutation was successfully selected.
/// Otherwise, notification will occur after any of mutations complete.
std::lock_guard lock(mutation_wait_mutex);
mutation_wait_event.notify_all();
}
bool scheduled = false; bool scheduled = false;
if (time_after_previous_cleanup_temporary_directories.compareAndRestartDeferred(getContext()->getSettingsRef().merge_tree_clear_old_temporary_directories_interval_seconds)) if (time_after_previous_cleanup_temporary_directories.compareAndRestartDeferred(getContext()->getSettingsRef().merge_tree_clear_old_temporary_directories_interval_seconds))
{ {