Split select and process merges

(cherry picked from commit 1889eb0ff1)
This commit is contained in:
alesapin 2020-09-30 15:40:46 +03:00
parent f64344f7c5
commit cb7db6d232
4 changed files with 239 additions and 194 deletions

View File

@ -622,6 +622,100 @@ void StorageMergeTree::loadMutations()
increment.value = std::max(Int64(increment.value.load()), current_mutations_by_version.rbegin()->first);
}
std::optional<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMerge(const StorageMetadataPtr &, bool aggressive, const String & partition_id, bool final, String * out_disable_reason)
{
std::unique_lock lock(currently_processing_in_background_mutex);
auto data_settings = getSettings();
FutureMergedMutatedPart future_part;
/// You must call destructor with unlocked `currently_processing_in_background_mutex`.
CurrentlyMergingPartsTaggerPtr merging_tagger;
MergeList::EntryPtr merge_entry;
auto can_merge = [this, &lock] (const DataPartPtr & left, const DataPartPtr & right, String *) -> bool
{
/// This predicate is checked for the first part of each partition.
/// (left = nullptr, right = "first part of partition")
if (!left)
return !currently_merging_mutating_parts.count(right);
return !currently_merging_mutating_parts.count(left) && !currently_merging_mutating_parts.count(right)
&& getCurrentMutationVersion(left, lock) == getCurrentMutationVersion(right, lock);
};
bool selected = false;
if (partition_id.empty())
{
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool;
/// TTL requirements is much more strict than for regular merge, so
/// if regular not possible, than merge with ttl is not also not
/// possible.
if (max_source_parts_size > 0)
{
selected = merger_mutator.selectPartsToMerge(
future_part,
aggressive,
max_source_parts_size,
can_merge,
merge_with_ttl_allowed,
out_disable_reason);
}
else if (out_disable_reason)
*out_disable_reason = "Current value of max_source_parts_size is zero";
}
else
{
while (true)
{
UInt64 disk_space = getStoragePolicy()->getMaxUnreservedFreeSpace();
selected = merger_mutator.selectAllPartsToMergeWithinPartition(
future_part, disk_space, can_merge, partition_id, final, out_disable_reason);
/// If final - we will wait for currently processing merges to finish and continue.
/// TODO Respect query settings for timeout
if (final
&& !selected
&& !currently_merging_mutating_parts.empty()
&& out_disable_reason
&& out_disable_reason->empty())
{
LOG_DEBUG(log, "Waiting for currently running merges ({} parts are merging right now) to perform OPTIMIZE FINAL",
currently_merging_mutating_parts.size());
if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for(
lock, std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC)))
{
*out_disable_reason = "Timeout while waiting for already running merges before running OPTIMIZE with FINAL";
break;
}
}
else
break;
}
}
if (!selected)
{
if (out_disable_reason)
{
if (!out_disable_reason->empty())
{
*out_disable_reason += ". ";
}
*out_disable_reason += "Cannot select parts for optimization";
}
return {};
}
merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, false);
auto table_id = getStorageID();
merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
return MergeMutateSelectedEntry{future_part, std::move(merging_tagger), std::move(merge_entry), {}};
}
bool StorageMergeTree::merge(
bool aggressive,
@ -632,99 +726,12 @@ bool StorageMergeTree::merge(
{
auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto metadata_snapshot = getInMemoryMetadataPtr();
auto data_settings = getSettings();
FutureMergedMutatedPart future_part;
/// You must call destructor with unlocked `currently_processing_in_background_mutex`.
std::optional<CurrentlyMergingPartsTagger> merging_tagger;
MergeList::EntryPtr merge_entry;
{
std::unique_lock lock(currently_processing_in_background_mutex);
auto can_merge = [this, &lock] (const DataPartPtr & left, const DataPartPtr & right, String *) -> bool
{
/// This predicate is checked for the first part of each partition.
/// (left = nullptr, right = "first part of partition")
if (!left)
return !currently_merging_mutating_parts.count(right);
return !currently_merging_mutating_parts.count(left) && !currently_merging_mutating_parts.count(right)
&& getCurrentMutationVersion(left, lock) == getCurrentMutationVersion(right, lock);
};
bool selected = false;
if (partition_id.empty())
{
UInt64 max_source_parts_size = merger_mutator.getMaxSourcePartsSizeForMerge();
bool merge_with_ttl_allowed = getTotalMergesWithTTLInMergeList() < data_settings->max_number_of_merges_with_ttl_in_pool;
/// TTL requirements is much more strict than for regular merge, so
/// if regular not possible, than merge with ttl is not also not
/// possible.
if (max_source_parts_size > 0)
{
selected = merger_mutator.selectPartsToMerge(
future_part,
aggressive,
max_source_parts_size,
can_merge,
merge_with_ttl_allowed,
out_disable_reason);
}
else if (out_disable_reason)
*out_disable_reason = "Current value of max_source_parts_size is zero";
}
else
{
while (true)
{
UInt64 disk_space = getStoragePolicy()->getMaxUnreservedFreeSpace();
selected = merger_mutator.selectAllPartsToMergeWithinPartition(
future_part, disk_space, can_merge, partition_id, final, out_disable_reason);
/// If final - we will wait for currently processing merges to finish and continue.
/// TODO Respect query settings for timeout
if (final
&& !selected
&& !currently_merging_mutating_parts.empty()
&& out_disable_reason
&& out_disable_reason->empty())
{
LOG_DEBUG(log, "Waiting for currently running merges ({} parts are merging right now) to perform OPTIMIZE FINAL",
currently_merging_mutating_parts.size());
if (std::cv_status::timeout == currently_processing_in_background_condition.wait_for(
lock, std::chrono::seconds(DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC)))
{
*out_disable_reason = "Timeout while waiting for already running merges before running OPTIMIZE with FINAL";
break;
}
}
else
break;
}
}
if (!selected)
{
if (out_disable_reason)
{
if (!out_disable_reason->empty())
{
*out_disable_reason += ". ";
}
*out_disable_reason += "Cannot select parts for optimization";
}
return false;
}
merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, metadata_snapshot, false);
auto table_id = getStorageID();
merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
}
auto merge_mutate_entry = selectPartsToMerge(metadata_snapshot, aggressive, partition_id, final, out_disable_reason);
if (!merge_mutate_entry)
return false;
auto & future_part = merge_mutate_entry->future_part;
/// Logging
Stopwatch stopwatch;
MutableDataPartPtr new_part;
@ -738,14 +745,14 @@ bool StorageMergeTree::merge(
future_part.name,
new_part,
future_part.parts,
merge_entry.get());
merge_mutate_entry->merge_entry.get());
};
try
{
new_part = merger_mutator.mergePartsToTemporaryPart(
future_part, metadata_snapshot, *merge_entry, table_lock_holder, time(nullptr), global_context,
merging_tagger->reserved_space, deduplicate);
future_part, metadata_snapshot, *(merge_mutate_entry->merge_entry), table_lock_holder, time(nullptr),
merge_mutate_entry->tagger->reserved_space, deduplicate);
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
write_part_log({});
@ -783,95 +790,98 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::movePartsTask()
}
}
std::optional<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String */* disable_reason */)
{
std::lock_guard lock(currently_processing_in_background_mutex);
size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements;
FutureMergedMutatedPart future_part;
MutationCommands commands;
CurrentlyMergingPartsTaggerPtr tagger;
if (current_mutations_by_version.empty())
return {};
auto mutations_end_it = current_mutations_by_version.end();
for (const auto & part : getDataPartsVector())
{
if (currently_merging_mutating_parts.count(part))
continue;
auto mutations_begin_it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
if (mutations_begin_it == mutations_end_it)
continue;
size_t max_source_part_size = merger_mutator.getMaxSourcePartSizeForMutation();
if (max_source_part_size < part->getBytesOnDisk())
{
LOG_DEBUG(log, "Current max source part size for mutation is {} but part size {}. Will not mutate part {}. "
"Max size depends not only on available space, but also on settings "
"'number_of_free_entries_in_pool_to_execute_mutation' and 'background_pool_size'",
max_source_part_size, part->getBytesOnDisk(), part->name);
continue;
}
size_t current_ast_elements = 0;
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
size_t commands_size = 0;
MutationCommands commands_for_size_validation;
for (const auto & command : it->second.commands)
{
if (command.type != MutationCommand::Type::DROP_COLUMN
&& command.type != MutationCommand::Type::DROP_INDEX
&& command.type != MutationCommand::Type::RENAME_COLUMN)
{
commands_for_size_validation.push_back(command);
}
else
{
commands_size += command.ast->size();
}
}
if (!commands_for_size_validation.empty())
{
MutationsInterpreter interpreter(
shared_from_this(), metadata_snapshot, commands_for_size_validation, global_context, false);
commands_size += interpreter.evaluateCommandsSize();
}
if (current_ast_elements + commands_size >= max_ast_elements)
break;
current_ast_elements += commands_size;
commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end());
}
auto new_part_info = part->info;
new_part_info.mutation = current_mutations_by_version.rbegin()->first;
future_part.parts.push_back(part);
future_part.part_info = new_part_info;
future_part.name = part->getNewName(new_part_info);
future_part.type = part->getType();
tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, true);
auto table_id = getStorageID();
MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
return MergeMutateSelectedEntry{future_part, std::move(tagger), std::move(merge_entry), commands};
}
return {};
}
bool StorageMergeTree::tryMutatePart()
{
auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
StorageMetadataPtr metadata_snapshot = getInMemoryMetadataPtr();
size_t max_ast_elements = global_context.getSettingsRef().max_expanded_ast_elements;
FutureMergedMutatedPart future_part;
MutationCommands commands;
/// You must call destructor with unlocked `currently_processing_in_background_mutex`.
std::optional<CurrentlyMergingPartsTagger> tagger;
{
std::lock_guard lock(currently_processing_in_background_mutex);
if (current_mutations_by_version.empty())
return false;
auto mutations_end_it = current_mutations_by_version.end();
for (const auto & part : getDataPartsVector())
{
if (currently_merging_mutating_parts.count(part))
continue;
auto mutations_begin_it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
if (mutations_begin_it == mutations_end_it)
continue;
size_t max_source_part_size = merger_mutator.getMaxSourcePartSizeForMutation();
if (max_source_part_size < part->getBytesOnDisk())
{
LOG_DEBUG(log, "Current max source part size for mutation is {} but part size {}. Will not mutate part {}. "
"Max size depends not only on available space, but also on settings "
"'number_of_free_entries_in_pool_to_execute_mutation' and 'background_pool_size'",
max_source_part_size, part->getBytesOnDisk(), part->name);
continue;
}
size_t current_ast_elements = 0;
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
size_t commands_size = 0;
MutationCommands commands_for_size_validation;
for (const auto & command : it->second.commands)
{
if (command.type != MutationCommand::Type::DROP_COLUMN
&& command.type != MutationCommand::Type::DROP_INDEX
&& command.type != MutationCommand::Type::RENAME_COLUMN)
{
commands_for_size_validation.push_back(command);
}
else
{
commands_size += command.ast->size();
}
}
if (!commands_for_size_validation.empty())
{
MutationsInterpreter interpreter(
shared_from_this(), metadata_snapshot, commands_for_size_validation, global_context, false);
commands_size += interpreter.evaluateCommandsSize();
}
if (current_ast_elements + commands_size >= max_ast_elements)
break;
current_ast_elements += commands_size;
commands.insert(commands.end(), it->second.commands.begin(), it->second.commands.end());
}
auto new_part_info = part->info;
new_part_info.mutation = current_mutations_by_version.rbegin()->first;
future_part.parts.push_back(part);
future_part.part_info = new_part_info;
future_part.name = part->getNewName(new_part_info);
future_part.type = part->getType();
tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
break;
}
}
if (!tagger)
auto merge_mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr);
if (!merge_mutate_entry)
return false;
auto table_id = getStorageID();
MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
auto & future_part = merge_mutate_entry->future_part;
Stopwatch stopwatch;
MutableDataPartPtr new_part;
@ -884,14 +894,14 @@ bool StorageMergeTree::tryMutatePart()
future_part.name,
new_part,
future_part.parts,
merge_entry.get());
merge_mutate_entry->merge_entry.get());
};
try
{
new_part = merger_mutator.mutatePartToTemporaryPart(
future_part, metadata_snapshot, commands, *merge_entry,
time(nullptr), global_context, tagger->reserved_space, table_lock_holder);
future_part, metadata_snapshot, merge_mutate_entry->commands, *(merge_mutate_entry->merge_entry),
time(nullptr), global_context, merge_mutate_entry->tagger->reserved_space, table_lock_holder);
renameTempPartAndReplace(new_part);

View File

@ -21,6 +21,8 @@
namespace DB
{
struct CurrentlyMergingPartsTagger;
/** See the description of the data structure in MergeTreeData.
*/
class StorageMergeTree final : public ext::shared_ptr_helper<StorageMergeTree>, public MergeTreeData
@ -140,6 +142,20 @@ private:
/// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true.
bool tryMutatePart();
friend struct CurrentlyMergingPartsTagger;
using CurrentlyMergingPartsTaggerPtr = std::unique_ptr<CurrentlyMergingPartsTagger>;
struct MergeMutateSelectedEntry
{
FutureMergedMutatedPart future_part;
CurrentlyMergingPartsTaggerPtr tagger;
MergeList::EntryPtr merge_entry;
MutationCommands commands;
};
std::optional<MergeMutateSelectedEntry> selectPartsToMerge(const StorageMetadataPtr & metadata_snapshot, bool aggressive, const String & partition_id, bool final, String * disable_reason);
std::optional<MergeMutateSelectedEntry> selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String * disable_reason);
BackgroundProcessingPoolTaskResult mergeMutateTask();
@ -173,7 +189,7 @@ private:
friend class MergeTreeBlockOutputStream;
friend class MergeTreeData;
friend struct CurrentlyMergingPartsTagger;
protected:

View File

@ -2537,16 +2537,8 @@ void StorageReplicatedMergeTree::mutationsUpdatingTask()
}
}
BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
ReplicatedMergeTreeQueue::SelectedEntry StorageReplicatedMergeTree::selectQueueEntry()
{
/// If replication queue is stopped exit immediately as we successfully executed the task
if (queue.actions_blocker.isCancelled())
{
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return BackgroundProcessingPoolTaskResult::SUCCESS;
}
/// This object will mark the element of the queue as running.
ReplicatedMergeTreeQueue::SelectedEntry selected;
@ -2559,14 +2551,14 @@ BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
LogEntryPtr & entry = selected.first;
return selected;
}
if (!entry)
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntry & selected_entry)
{
time_t prev_attempt_time = entry->last_attempt_time;
bool res = queue.processEntry([this]{ return getZooKeeper(); }, entry, [&](LogEntryPtr & entry_to_process)
LogEntryPtr & entry = selected_entry.first;
return queue.processEntry([this]{ return getZooKeeper(); }, entry, [&](LogEntryPtr & entry_to_process)
{
try
{
@ -2605,6 +2597,28 @@ BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
throw;
}
});
}
BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
{
/// If replication queue is stopped exit immediately as we successfully executed the task
if (queue.actions_blocker.isCancelled())
{
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return BackgroundProcessingPoolTaskResult::SUCCESS;
}
/// This object will mark the element of the queue as running.
ReplicatedMergeTreeQueue::SelectedEntry selected_entry = selectQueueEntry();
LogEntryPtr & entry = selected_entry.first;
if (!entry)
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
time_t prev_attempt_time = entry->last_attempt_time;
bool res = processQueueEntry(selected_entry);
/// We will go to sleep if the processing fails and if we have already processed this record recently.
bool need_sleep = !res && (entry->last_attempt_time - prev_attempt_time < 10);

View File

@ -416,6 +416,11 @@ private:
/// Clone replica if it is lost.
void cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zookeeper);
ReplicatedMergeTreeQueue::SelectedEntry selectQueueEntry();
bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntry & entry);
/** Performs actions from the queue.
*/
BackgroundProcessingPoolTaskResult queueTask();