Split merge to separate method

This commit is contained in:
alesapin 2020-09-30 16:49:22 +03:00
parent 1889eb0ff1
commit 14a7a25c4b
2 changed files with 24 additions and 13 deletions

View File

@ -619,6 +619,7 @@ void StorageMergeTree::loadMutations()
std::optional<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMerge(const StorageMetadataPtr &, bool aggressive, const String & partition_id, bool final, String * out_disable_reason) std::optional<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMerge(const StorageMetadataPtr &, bool aggressive, const String & partition_id, bool final, String * out_disable_reason)
{ {
auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
std::unique_lock lock(currently_processing_in_background_mutex); std::unique_lock lock(currently_processing_in_background_mutex);
auto data_settings = getSettings(); auto data_settings = getSettings();
@ -718,15 +719,18 @@ bool StorageMergeTree::merge(
bool deduplicate, bool deduplicate,
String * out_disable_reason) String * out_disable_reason)
{ {
auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto metadata_snapshot = getInMemoryMetadataPtr(); auto metadata_snapshot = getInMemoryMetadataPtr();
auto merge_mutate_entry = selectPartsToMerge(metadata_snapshot, aggressive, partition_id, final, out_disable_reason); auto merge_mutate_entry = selectPartsToMerge(metadata_snapshot, aggressive, partition_id, final, out_disable_reason);
if (!merge_mutate_entry) if (!merge_mutate_entry)
return false; return false;
auto & future_part = merge_mutate_entry->future_part; return mergeSelectedParts(metadata_snapshot, deduplicate, *merge_mutate_entry);
/// Logging }
bool StorageMergeTree::mergeSelectedParts(const StorageMetadataPtr & metadata_snapshot, bool deduplicate, MergeMutateSelectedEntry & merge_mutate_entry) {
auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto & future_part = merge_mutate_entry.future_part;
Stopwatch stopwatch; Stopwatch stopwatch;
MutableDataPartPtr new_part; MutableDataPartPtr new_part;
@ -739,14 +743,14 @@ bool StorageMergeTree::merge(
future_part.name, future_part.name,
new_part, new_part,
future_part.parts, future_part.parts,
merge_mutate_entry->merge_entry.get()); merge_mutate_entry.merge_entry.get());
}; };
try try
{ {
new_part = merger_mutator.mergePartsToTemporaryPart( new_part = merger_mutator.mergePartsToTemporaryPart(
future_part, metadata_snapshot, *(merge_mutate_entry->merge_entry), table_lock_holder, time(nullptr), future_part, metadata_snapshot, *(merge_mutate_entry.merge_entry), table_lock_holder, time(nullptr),
merge_mutate_entry->tagger->reserved_space, deduplicate); merge_mutate_entry.tagger->reserved_space, deduplicate);
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr); merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
write_part_log({}); write_part_log({});
@ -760,7 +764,6 @@ bool StorageMergeTree::merge(
return true; return true;
} }
bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & part) const bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & part) const
{ {
std::lock_guard background_processing_lock(currently_processing_in_background_mutex); std::lock_guard background_processing_lock(currently_processing_in_background_mutex);
@ -786,6 +789,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::movePartsTask()
std::optional<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String */* disable_reason */) std::optional<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String */* disable_reason */)
{ {
auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
std::lock_guard lock(currently_processing_in_background_mutex); std::lock_guard lock(currently_processing_in_background_mutex);
size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements; size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements;
@ -868,14 +872,19 @@ std::optional<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::sele
bool StorageMergeTree::tryMutatePart() bool StorageMergeTree::tryMutatePart()
{ {
auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
StorageMetadataPtr metadata_snapshot = getInMemoryMetadataPtr(); StorageMetadataPtr metadata_snapshot = getInMemoryMetadataPtr();
auto merge_mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr); auto merge_mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr);
if (!merge_mutate_entry) if (!merge_mutate_entry)
return false; return false;
auto & future_part = merge_mutate_entry->future_part; return mutateSelectedPart(metadata_snapshot, *merge_mutate_entry);
}
bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_snapshot, MergeMutateSelectedEntry & merge_mutate_entry)
{
auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto & future_part = merge_mutate_entry.future_part;
Stopwatch stopwatch; Stopwatch stopwatch;
MutableDataPartPtr new_part; MutableDataPartPtr new_part;
@ -888,14 +897,14 @@ bool StorageMergeTree::tryMutatePart()
future_part.name, future_part.name,
new_part, new_part,
future_part.parts, future_part.parts,
merge_mutate_entry->merge_entry.get()); merge_mutate_entry.merge_entry.get());
}; };
try try
{ {
new_part = merger_mutator.mutatePartToTemporaryPart( new_part = merger_mutator.mutatePartToTemporaryPart(
future_part, metadata_snapshot, merge_mutate_entry->commands, *(merge_mutate_entry->merge_entry), future_part, metadata_snapshot, merge_mutate_entry.commands, *(merge_mutate_entry.merge_entry),
time(nullptr), global_context, merge_mutate_entry->tagger->reserved_space, table_lock_holder); time(nullptr), global_context, merge_mutate_entry.tagger->reserved_space, table_lock_holder);
renameTempPartAndReplace(new_part); renameTempPartAndReplace(new_part);
@ -912,7 +921,6 @@ bool StorageMergeTree::tryMutatePart()
return true; return true;
} }
BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask() BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
{ {
if (shutdown_called) if (shutdown_called)

View File

@ -155,7 +155,10 @@ private:
}; };
std::optional<MergeMutateSelectedEntry> selectPartsToMerge(const StorageMetadataPtr & metadata_snapshot, bool aggressive, const String & partition_id, bool final, String * disable_reason); std::optional<MergeMutateSelectedEntry> selectPartsToMerge(const StorageMetadataPtr & metadata_snapshot, bool aggressive, const String & partition_id, bool final, String * disable_reason);
bool mergeSelectedParts(const StorageMetadataPtr & metadata_snapshot, bool deduplicate, MergeMutateSelectedEntry & entry);
std::optional<MergeMutateSelectedEntry> selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String * disable_reason); std::optional<MergeMutateSelectedEntry> selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String * disable_reason);
bool mutateSelectedPart(const StorageMetadataPtr & metadata_snapshot, MergeMutateSelectedEntry & entry);
BackgroundProcessingPoolTaskResult mergeMutateTask(); BackgroundProcessingPoolTaskResult mergeMutateTask();