Simplify logic

This commit is contained in:
Antonio Andelic 2022-10-19 12:34:20 +00:00
parent d105796c91
commit e5408aac99
14 changed files with 26 additions and 146 deletions

View File

@ -333,6 +333,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
SimpleMergeSelector::Settings merge_settings; SimpleMergeSelector::Settings merge_settings;
/// Override value from table settings /// Override value from table settings
merge_settings.max_parts_to_merge_at_once = data_settings->max_parts_to_merge_at_once; merge_settings.max_parts_to_merge_at_once = data_settings->max_parts_to_merge_at_once;
merge_settings.min_age_to_force_merge = data_settings->min_age_to_force_merge_seconds;
if (aggressive) if (aggressive)
merge_settings.base = 1; merge_settings.base = 1;
@ -350,6 +351,7 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMerge(
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm"; *out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
return SelectPartsDecision::CANNOT_SELECT; return SelectPartsDecision::CANNOT_SELECT;
} }
} }
MergeTreeData::DataPartsVector parts; MergeTreeData::DataPartsVector parts;

View File

@ -62,8 +62,7 @@ struct Settings;
M(UInt64, merge_tree_clear_old_temporary_directories_interval_seconds, 60, "The period of executing the clear old temporary directories operation in background.", 0) \ M(UInt64, merge_tree_clear_old_temporary_directories_interval_seconds, 60, "The period of executing the clear old temporary directories operation in background.", 0) \
M(UInt64, merge_tree_clear_old_parts_interval_seconds, 1, "The period of executing the clear old parts operation in background.", 0) \ M(UInt64, merge_tree_clear_old_parts_interval_seconds, 1, "The period of executing the clear old parts operation in background.", 0) \
M(UInt64, merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds, 1ULL * 3600 * 24 * 30, "Remove old broken detached parts in the background if they remained intouched for a specified by this setting period of time.", 0) \ M(UInt64, merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds, 1ULL * 3600 * 24 * 30, "Remove old broken detached parts in the background if they remained intouched for a specified by this setting period of time.", 0) \
M(UInt64, auto_optimize_partition_interval_seconds, 0, "The period of executing the auto optimize partitions in background. Set to 0 to disable.", 0) \ M(UInt64, min_age_to_force_merge_seconds, 0, "If all parts in a certain range are older than this value, range will be always eligable for merging. Set to 0 to disable.", 0) \
M(UInt64, auto_optimize_partition_after_seconds, 0, "The number of seconds since last mutation required for partition to be automatically optimized. Set to 0 to disable.", 0) \
M(UInt64, merge_tree_enable_clear_old_broken_detached, false, "Enable clearing old broken detached parts operation in background.", 0) \ M(UInt64, merge_tree_enable_clear_old_broken_detached, false, "Enable clearing old broken detached parts operation in background.", 0) \
M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \ M(Bool, remove_rolled_back_parts_immediately, 1, "Setting for an incomplete experimental feature.", 0) \
\ \

View File

@ -155,8 +155,6 @@ bool ReplicatedMergeTreeRestartingThread::runImpl()
storage.mutations_updating_task->activateAndSchedule(); storage.mutations_updating_task->activateAndSchedule();
storage.mutations_finalizing_task->activateAndSchedule(); storage.mutations_finalizing_task->activateAndSchedule();
storage.merge_selecting_task->activateAndSchedule(); storage.merge_selecting_task->activateAndSchedule();
if (storage.auto_optimize_partition_task)
storage.auto_optimize_partition_task->activateAndSchedule();
storage.cleanup_thread.start(); storage.cleanup_thread.start();
storage.part_check_thread.start(); storage.part_check_thread.start();

View File

@ -102,6 +102,9 @@ bool allow(
double max_size_to_lower_base_log, double max_size_to_lower_base_log,
const SimpleMergeSelector::Settings & settings) const SimpleMergeSelector::Settings & settings)
{ {
if (settings.min_age_to_force_merge && min_age >= settings.min_age_to_force_merge)
return true;
// std::cerr << "sum_size: " << sum_size << "\n"; // std::cerr << "sum_size: " << sum_size << "\n";
/// Map size to 0..1 using logarithmic scale /// Map size to 0..1 using logarithmic scale

View File

@ -141,6 +141,11 @@ public:
double heuristic_to_align_parts_max_absolute_difference_in_powers_of_two = 0.5; double heuristic_to_align_parts_max_absolute_difference_in_powers_of_two = 0.5;
double heuristic_to_align_parts_max_score_adjustment = 0.75; double heuristic_to_align_parts_max_score_adjustment = 0.75;
/// If it's not 0, all part ranges that have min_age larger than min_age_to_force_merge
/// will be considered for merging
size_t min_age_to_force_merge = 0;
/** Heuristic: /** Heuristic:
* From right side of range, remove all parts, that size is less than specified ratio of sum_size. * From right side of range, remove all parts, that size is less than specified ratio of sum_size.
*/ */

View File

@ -798,54 +798,6 @@ void StorageMergeTree::loadMutations()
increment.value = std::max(increment.value.load(), current_mutations_by_version.rbegin()->first); increment.value = std::max(increment.value.load(), current_mutations_by_version.rbegin()->first);
} }
std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectOnePartitionToOptimize(
const StorageMetadataPtr & metadata_snapshot,
TableLockHolder & table_lock_holder,
std::unique_lock<std::mutex> & lock,
const MergeTreeTransactionPtr & txn,
bool optimize_skip_merged_partitions)
{
// Select the `best partition to merge.
std::unordered_map<String, Int32> partition_parts_sum_diff;
ssize_t base = time(nullptr) - getSettings()->auto_optimize_partition_after_seconds;
auto data_parts = getDataPartsForInternalUsage();
for (const auto & part : data_parts)
{
if (part->modification_time < base)
partition_parts_sum_diff[part->info.partition_id] += (base - part->modification_time);
}
auto best_partition_it = std::max_element(partition_parts_sum_diff.begin(), partition_parts_sum_diff.end(), [](const auto & e1, const auto & e2) { return e1.second < e2.second; });
if (best_partition_it == partition_parts_sum_diff.end())
{
return nullptr;
}
// Merge the selected partition.
String disable_reason;
SelectPartsDecision select_decision;
auto merge_entry = selectPartsToMerge(
metadata_snapshot,
true,
best_partition_it->first,
true,
&disable_reason,
table_lock_holder,
lock,
txn,
optimize_skip_merged_partitions,
&select_decision);
if (select_decision == SelectPartsDecision::NOTHING_TO_MERGE)
return nullptr;
if (!merge_entry)
{
static constexpr const char * message = "Cannot OPTIMIZE table in background: {}";
if (disable_reason.empty())
disable_reason = "unknown reason";
LOG_INFO(log, message, disable_reason);
}
return merge_entry;
}
MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge( MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMerge(
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
bool aggressive, bool aggressive,
@ -1203,12 +1155,6 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
if (!merge_entry) if (!merge_entry)
mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, share_lock, lock); mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, share_lock, lock);
if (getSettings()->auto_optimize_partition_interval_seconds
&& !merge_entry && !mutate_entry
&& time_after_previous_optimize_one_partition.compareAndRestartDeferred(getSettings()->auto_optimize_partition_interval_seconds))
{
merge_entry = selectOnePartitionToOptimize(metadata_snapshot, share_lock, lock, txn);
}
has_mutations = !current_mutations_by_version.empty(); has_mutations = !current_mutations_by_version.empty();
} }

View File

@ -135,8 +135,6 @@ private:
AtomicStopwatch time_after_previous_cleanup_temporary_directories; AtomicStopwatch time_after_previous_cleanup_temporary_directories;
/// For clearOldBrokenDetachedParts /// For clearOldBrokenDetachedParts
AtomicStopwatch time_after_previous_cleanup_broken_detached_parts; AtomicStopwatch time_after_previous_cleanup_broken_detached_parts;
/// For optimizeOnePartition;
AtomicStopwatch time_after_previous_optimize_one_partition;
/// Mutex for parts currently processing in background /// Mutex for parts currently processing in background
/// merging (also with TTL), mutating or moving. /// merging (also with TTL), mutating or moving.
@ -171,15 +169,6 @@ private:
String * out_disable_reason = nullptr, String * out_disable_reason = nullptr,
bool optimize_skip_merged_partitions = false); bool optimize_skip_merged_partitions = false);
/** Determines what part within one partition should be merged.
*/
std::shared_ptr<MergeMutateSelectedEntry> selectOnePartitionToOptimize(
const StorageMetadataPtr & metadata_snapshot,
TableLockHolder & table_lock_holder,
std::unique_lock<std::mutex> & lock,
const MergeTreeTransactionPtr & txn,
bool optimize_skip_merged_partitions = false);
/// Make part state outdated and queue it to remove without timeout /// Make part state outdated and queue it to remove without timeout
/// If force, then stop merges and block them until part state became outdated. Throw exception if part doesn't exists /// If force, then stop merges and block them until part state became outdated. Throw exception if part doesn't exists
/// If not force, then take merges selector and check that part is not participating in background operations. /// If not force, then take merges selector and check that part is not participating in background operations.

View File

@ -293,14 +293,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
merge_selecting_task = getContext()->getSchedulePool().createTask( merge_selecting_task = getContext()->getSchedulePool().createTask(
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mergeSelectingTask)", [this] { mergeSelectingTask(); }); getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mergeSelectingTask)", [this] { mergeSelectingTask(); });
if (getSettings()->auto_optimize_partition_interval_seconds)
auto_optimize_partition_task = getContext()->getSchedulePool().createTask(
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::autoOptimizePartitionTask)", [this] { autoOptimizePartitionTask(); });
/// Will be activated if we win leader election. /// Will be activated if we win leader election.
merge_selecting_task->deactivate(); merge_selecting_task->deactivate();
if (auto_optimize_partition_task)
auto_optimize_partition_task->deactivate();
mutations_finalizing_task = getContext()->getSchedulePool().createTask( mutations_finalizing_task = getContext()->getSchedulePool().createTask(
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mutationsFinalizingTask)", [this] { mutationsFinalizingTask(); }); getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mutationsFinalizingTask)", [this] { mutationsFinalizingTask(); });
@ -4457,31 +4452,9 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con
local_context); local_context);
} }
void StorageReplicatedMergeTree::autoOptimizePartitionTask()
{
if (!is_leader)
return;
auto table_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
try
{
optimizeImpl(nullptr, nullptr, nullptr, true, true, Names{}, table_lock, getContext(), true);
}
catch (const Exception & e)
{
LOG_ERROR(log, "Can't optimize partitions automatically for table: {}, reason: {}", getStorageID().getNameForLogs(), e.displayText());
}
catch (...)
{
LOG_ERROR(log, "There is a problem when optimizing table: {}, reason: {}", getStorageID().getNameForLogs(), getCurrentExceptionMessage(true));
}
if (auto_optimize_partition_task)
auto_optimize_partition_task->scheduleAfter(getSettings()->auto_optimize_partition_interval_seconds);
}
bool StorageReplicatedMergeTree::optimize( bool StorageReplicatedMergeTree::optimize(
const ASTPtr & query, const ASTPtr &,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr &,
const ASTPtr & partition, const ASTPtr & partition,
bool final, bool final,
bool deduplicate, bool deduplicate,
@ -4491,20 +4464,7 @@ bool StorageReplicatedMergeTree::optimize(
/// NOTE: exclusive lock cannot be used here, since this may lead to deadlock (see comments below), /// NOTE: exclusive lock cannot be used here, since this may lead to deadlock (see comments below),
/// but it should be safe to use non-exclusive to avoid dropping parts that may be required for processing queue. /// but it should be safe to use non-exclusive to avoid dropping parts that may be required for processing queue.
auto table_lock = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout); auto table_lock = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout);
return optimizeImpl(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, table_lock, query_context, false);
}
bool StorageReplicatedMergeTree::optimizeImpl(
const ASTPtr &,
const StorageMetadataPtr &,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
TableLockHolder & table_lock,
ContextPtr query_context,
bool auto_optimize_in_background)
{
assertNotReadonly(); assertNotReadonly();
if (!is_leader) if (!is_leader)
@ -4602,12 +4562,8 @@ bool StorageReplicatedMergeTree::optimizeImpl(
DataPartsVector data_parts = getVisibleDataPartsVector(query_context); DataPartsVector data_parts = getVisibleDataPartsVector(query_context);
std::unordered_set<String> partition_ids; std::unordered_set<String> partition_ids;
ssize_t baseline = time(nullptr) - storage_settings_ptr->auto_optimize_partition_after_seconds;
for (const DataPartPtr & part : data_parts) for (const DataPartPtr & part : data_parts)
{
if (!auto_optimize_in_background || part->modification_time < baseline)
partition_ids.emplace(part->info.partition_id); partition_ids.emplace(part->info.partition_id);
}
for (const String & partition_id : partition_ids) for (const String & partition_id : partition_ids)
{ {
@ -4626,12 +4582,6 @@ bool StorageReplicatedMergeTree::optimizeImpl(
table_lock.reset(); table_lock.reset();
if (!auto_optimize_in_background)
{
for (auto & merge_entry : merge_entries)
waitForLogEntryToBeProcessedIfNecessary(merge_entry, query_context);
}
return assigned; return assigned;
} }

View File

@ -331,16 +331,7 @@ public:
bool canUseZeroCopyReplication() const; bool canUseZeroCopyReplication() const;
private: private:
std::atomic_bool are_restoring_replica {false}; std::atomic_bool are_restoring_replica {false};
bool optimizeImpl(
const ASTPtr &,
const StorageMetadataPtr &,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
TableLockHolder & table_lock,
ContextPtr query_context,
bool auto_optimize_in_background);
/// Get a sequential consistent view of current parts. /// Get a sequential consistent view of current parts.
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const; ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const;
@ -455,9 +446,6 @@ private:
/// A task that marks finished mutations as done. /// A task that marks finished mutations as done.
BackgroundSchedulePool::TaskHolder mutations_finalizing_task; BackgroundSchedulePool::TaskHolder mutations_finalizing_task;
/// A task that optimize partitions automatically background if enabled.
BackgroundSchedulePool::TaskHolder auto_optimize_partition_task { nullptr };
/// A thread that removes old parts, log entries, and blocks. /// A thread that removes old parts, log entries, and blocks.
ReplicatedMergeTreeCleanupThread cleanup_thread; ReplicatedMergeTreeCleanupThread cleanup_thread;

View File

@ -1,5 +0,0 @@
<clickhouse>
<merge_tree>
<auto_optimize_partition_interval_seconds>2</auto_optimize_partition_interval_seconds>
</merge_tree>
</clickhouse>

View File

@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<min_age_to_force_merge_seconds>2</min_age_to_force_merge_seconds>
</merge_tree>
</clickhouse>

View File

@ -38,21 +38,21 @@ def check_expected_result_or_fail(seconds, expected):
assert ok assert ok
def test_without_auto_optimize_merge_tree(start_cluster): def test_without_force_merge_old_parts(start_cluster):
node.query("CREATE TABLE test (i Int64) ENGINE = MergeTree ORDER BY i;") node.query("CREATE TABLE test (i Int64) ENGINE = MergeTree ORDER BY i;")
node.query("INSERT INTO test SELECT 1") node.query("INSERT INTO test SELECT 1")
node.query("INSERT INTO test SELECT 2") node.query("INSERT INTO test SELECT 2")
node.query("INSERT INTO test SELECT 3") node.query("INSERT INTO test SELECT 3")
expected = TSV("""3\n""") expected = TSV("""3\n""")
check_expected_result_or_fail(5, expected) check_expected_result_or_fail(10, expected)
node.query("DROP TABLE test;") node.query("DROP TABLE test;")
def test_auto_optimize_merge_tree(start_cluster): def test_force_merge_old_parts(start_cluster):
node.query( node.query(
"CREATE TABLE test (i Int64) ENGINE = MergeTree ORDER BY i SETTINGS auto_optimize_partition_interval_seconds=5;" "CREATE TABLE test (i Int64) ENGINE = MergeTree ORDER BY i SETTINGS min_age_to_force_merge_seconds=5;"
) )
node.query("INSERT INTO test SELECT 1") node.query("INSERT INTO test SELECT 1")
node.query("INSERT INTO test SELECT 2") node.query("INSERT INTO test SELECT 2")
@ -64,9 +64,9 @@ def test_auto_optimize_merge_tree(start_cluster):
node.query("DROP TABLE test;") node.query("DROP TABLE test;")
def test_auto_optimize_replicated_merge_tree(start_cluster): def test_force_merge_old_parts_replicated_merge_tree(start_cluster):
node.query( node.query(
"CREATE TABLE test (i Int64) ENGINE = ReplicatedMergeTree('/clickhouse/testing/test', 'node') ORDER BY i SETTINGS auto_optimize_partition_interval_seconds=5;" "CREATE TABLE test (i Int64) ENGINE = ReplicatedMergeTree('/clickhouse/testing/test', 'node') ORDER BY i SETTINGS min_age_to_force_merge_seconds=5;"
) )
node.query("INSERT INTO test SELECT 1") node.query("INSERT INTO test SELECT 1")
node.query("INSERT INTO test SELECT 2") node.query("INSERT INTO test SELECT 2")