Merge pull request #70647 from ClickHouse/add_ability_to_change_merge_selecting

Add ability to choose merge seleting algorithm
This commit is contained in:
alesapin 2024-10-16 10:46:58 +00:00 committed by GitHub
commit 87404deb4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 269 additions and 51 deletions

View File

@ -244,6 +244,7 @@ add_object_library(clickhouse_storages Storages)
add_object_library(clickhouse_storages_mysql Storages/MySQL)
add_object_library(clickhouse_storages_distributed Storages/Distributed)
add_object_library(clickhouse_storages_mergetree Storages/MergeTree)
add_object_library(clickhouse_storages_mergetree_merge_selectors Storages/MergeTree/MergeSelectors)
add_object_library(clickhouse_storages_statistics Storages/Statistics)
add_object_library(clickhouse_storages_liveview Storages/LiveView)
add_object_library(clickhouse_storages_windowview Storages/WindowView)

View File

@ -0,0 +1,12 @@
#pragma once
#include <cstdint>
namespace DB
{
enum class MergeSelectorAlgorithm : uint8_t
{
SIMPLE,
};
}

View File

@ -1,7 +1,6 @@
#include <Core/SettingsEnums.h>
#include <magic_enum.hpp>
#include <Access/Common/SQLSecurityDefs.h>
#include <boost/range/adaptor/map.hpp>
@ -273,4 +272,10 @@ IMPLEMENT_SETTING_ENUM(
{{"user_display", IdentifierQuotingRule::UserDisplay},
{"when_necessary", IdentifierQuotingRule::WhenNecessary},
{"always", IdentifierQuotingRule::Always}})
IMPLEMENT_SETTING_ENUM(
MergeSelectorAlgorithm,
ErrorCodes::BAD_ARGUMENTS,
{{"Simple", MergeSelectorAlgorithm::SIMPLE}})
}

View File

@ -14,6 +14,7 @@
#include <Parsers/IdentifierQuotingStyle.h>
#include <QueryPipeline/SizeLimits.h>
#include <Common/ShellCommandSettings.h>
#include <Core/MergeSelectorAlgorithm.h>
namespace DB
@ -363,4 +364,6 @@ enum class GroupArrayActionWhenLimitReached : uint8_t
};
DECLARE_SETTING_ENUM(GroupArrayActionWhenLimitReached)
DECLARE_SETTING_ENUM(MergeSelectorAlgorithm)
}

View File

@ -0,0 +1 @@
add_subdirectory(MergeSelectors)

View File

@ -1,11 +1,17 @@
#include <Storages/MergeTree/AllMergeSelector.h>
#include <cmath>
#include <Storages/MergeTree/MergeSelectors/AllMergeSelector.h>
#include <Storages/MergeTree/MergeSelectors/MergeSelectorFactory.h>
namespace DB
{
void registerAllMergeSelector(MergeSelectorFactory & factory)
{
factory.registerPrivateSelector("All", [](const std::any &)
{
return std::make_shared<AllMergeSelector>();
});
}
AllMergeSelector::PartsRange AllMergeSelector::select(
const PartsRanges & parts_ranges,
size_t /*max_total_size_to_merge*/)

View File

@ -1,6 +1,6 @@
#pragma once
#include <Storages/MergeTree/MergeSelector.h>
#include <Storages/MergeTree/MergeSelectors/MergeSelector.h>
namespace DB

View File

@ -0,0 +1,52 @@
#include <Storages/MergeTree/MergeSelectors/MergeSelectorFactory.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
MergeSelectorFactory & MergeSelectorFactory::instance()
{
static MergeSelectorFactory ret;
return ret;
}
void MergeSelectorFactory::registerPrivateSelector(std::string name, MergeSelectorFactory::Creator && creator)
{
if (!creators.emplace(name, creator).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Merge selector '{}' already exists", name);
}
void MergeSelectorFactory::registerPublicSelector(std::string name, MergeSelectorAlgorithm enum_value, Creator && creator)
{
registerPrivateSelector(name, std::move(creator));
if (!enum_to_name_mapping.emplace(enum_value, name).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Merge select with enum value {} already exists with different name", enum_value);
}
MergeSelectorPtr MergeSelectorFactory::get(const std::string & name, const std::any & settings) const
{
auto it = creators.find(name);
if (it == creators.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown merge selector {}", name);
return it->second(settings);
}
MergeSelectorPtr MergeSelectorFactory::get(MergeSelectorAlgorithm algorithm, const std::any & settings) const
{
auto it = enum_to_name_mapping.find(algorithm);
if (it == enum_to_name_mapping.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown merge selector {}", algorithm);
return get(it->second, settings);
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <unordered_map>
#include <functional>
#include <memory>
#include <string>
#include <any>
#include <boost/noncopyable.hpp>
#include <Core/MergeSelectorAlgorithm.h>
namespace DB
{
class IMergeSelector;
using MergeSelectorPtr = std::shared_ptr<IMergeSelector>;
class MergeSelectorFactory final : private boost::noncopyable
{
private:
using Creator = std::function<MergeSelectorPtr(std::any)>;
using CreatorByNameMap = std::unordered_map<std::string, Creator>;
using EnumToName = std::unordered_map<MergeSelectorAlgorithm, std::string>;
CreatorByNameMap creators;
EnumToName enum_to_name_mapping;
MergeSelectorFactory() = default;
public:
static MergeSelectorFactory & instance();
MergeSelectorPtr get(const std::string & name, const std::any & settings = {}) const;
MergeSelectorPtr get(MergeSelectorAlgorithm algorithm, const std::any & settings = {}) const;
void registerPrivateSelector(std::string name, Creator && creator);
void registerPublicSelector(std::string name, MergeSelectorAlgorithm enum_value, Creator && creator);
};
}

View File

@ -1,4 +1,6 @@
#include <Storages/MergeTree/SimpleMergeSelector.h>
#include <Storages/MergeTree/MergeSelectors/SimpleMergeSelector.h>
#include <Storages/MergeTree/MergeSelectors/MergeSelectorFactory.h>
#include <Core/MergeSelectorAlgorithm.h>
#include <base/interpolate.h>
@ -10,6 +12,14 @@
namespace DB
{
void registerSimpleMergeSelector(MergeSelectorFactory & factory)
{
factory.registerPublicSelector("Simple", MergeSelectorAlgorithm::SIMPLE, [](const std::any & settings)
{
return std::make_shared<SimpleMergeSelector>(std::any_cast<SimpleMergeSelector::Settings>(settings));
});
}
namespace
{

View File

@ -1,6 +1,6 @@
#pragma once
#include <Storages/MergeTree/MergeSelector.h>
#include <Storages/MergeTree/MergeSelectors/MergeSelector.h>
/**

View File

@ -1,14 +1,32 @@
#include <Storages/MergeTree/TTLMergeSelector.h>
#include <Storages/MergeTree/MergeSelectors/TTLMergeSelector.h>
#include <Storages/MergeTree/MergeSelectors/MergeSelectorFactory.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Parsers/queryToString.h>
#include <algorithm>
#include <cmath>
#include <any>
namespace DB
{
void registerTTLDeleteMergeSelector(MergeSelectorFactory & factory)
{
factory.registerPrivateSelector("TTLDelete", [](const std::any & params)
{
return std::make_shared<TTLDeleteMergeSelector>(std::any_cast<TTLDeleteMergeSelector::Params>(params));
});
}
void registerTTLRecompressMergeSelector(MergeSelectorFactory & factory)
{
factory.registerPrivateSelector("TTLRecompress", [](const std::any & params)
{
return std::make_shared<TTLRecompressMergeSelector>(std::any_cast<TTLRecompressMergeSelector::Params>(params));
});
}
const String & getPartitionIdForPart(const ITTLMergeSelector::Part & part_info)
{
const MergeTreeData::DataPartPtr & part = part_info.getDataPartPtr();

View File

@ -1,7 +1,7 @@
#pragma once
#include <base/types.h>
#include <Storages/MergeTree/MergeSelector.h>
#include <Storages/MergeTree/MergeSelectors/MergeSelector.h>
#include <Storages/TTLDescription.h>
#include <map>
@ -58,10 +58,18 @@ class TTLDeleteMergeSelector : public ITTLMergeSelector
public:
using PartitionIdToTTLs = std::map<String, time_t>;
TTLDeleteMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_,
bool only_drop_parts_, bool dry_run_)
: ITTLMergeSelector(merge_due_times_, current_time_, merge_cooldown_time_, dry_run_)
, only_drop_parts(only_drop_parts_) {}
struct Params
{
PartitionIdToTTLs & merge_due_times;
time_t current_time;
Int64 merge_cooldown_time;
bool only_drop_parts;
bool dry_run;
};
explicit TTLDeleteMergeSelector(const Params & params)
: ITTLMergeSelector(params.merge_due_times, params.current_time, params.merge_cooldown_time, params.dry_run)
, only_drop_parts(params.only_drop_parts) {}
time_t getTTLForPart(const IMergeSelector::Part & part) const override;
@ -78,10 +86,18 @@ private:
class TTLRecompressMergeSelector : public ITTLMergeSelector
{
public:
TTLRecompressMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_,
const TTLDescriptions & recompression_ttls_, bool dry_run_)
: ITTLMergeSelector(merge_due_times_, current_time_, merge_cooldown_time_, dry_run_)
, recompression_ttls(recompression_ttls_)
struct Params
{
PartitionIdToTTLs & merge_due_times;
time_t current_time;
Int64 merge_cooldown_time;
TTLDescriptions recompression_ttls;
bool dry_run;
};
explicit TTLRecompressMergeSelector(const Params & params)
: ITTLMergeSelector(params.merge_due_times, params.current_time, params.merge_cooldown_time, params.dry_run)
, recompression_ttls(params.recompression_ttls)
{}
/// Return part min recompression TTL.

View File

@ -0,0 +1,23 @@
#include <Storages/MergeTree/MergeSelectors/MergeSelector.h>
#include <Storages/MergeTree/MergeSelectors/MergeSelectorFactory.h>
namespace DB
{
void registerSimpleMergeSelector(MergeSelectorFactory & factory);
void registerAllMergeSelector(MergeSelectorFactory & factory);
void registerTTLDeleteMergeSelector(MergeSelectorFactory & factory);
void registerTTLRecompressMergeSelector(MergeSelectorFactory & factory);
void registerMergeSelectors()
{
auto & factory = MergeSelectorFactory::instance();
registerSimpleMergeSelector(factory);
registerAllMergeSelector(factory);
registerTTLDeleteMergeSelector(factory);
registerTTLRecompressMergeSelector(factory);
}
}

View File

@ -0,0 +1,6 @@
#pragma once
namespace DB
{
void registerMergeSelectors();
}

View File

@ -2,9 +2,9 @@
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
#include <Storages/MergeTree/SimpleMergeSelector.h>
#include <Storages/MergeTree/AllMergeSelector.h>
#include <Storages/MergeTree/TTLMergeSelector.h>
#include <Storages/MergeTree/MergeSelectors/SimpleMergeSelector.h>
#include <Storages/MergeTree/MergeSelectors/AllMergeSelector.h>
#include <Storages/MergeTree/MergeSelectors/TTLMergeSelector.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
@ -15,6 +15,7 @@
#include <Storages/MergeTree/MergeProgress.h>
#include <Storages/MergeTree/MergeTask.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/MergeTree/MergeSelectors/MergeSelectorFactory.h>
#include <Processors/Transforms/TTLTransform.h>
#include <Processors/Transforms/TTLCalcTransform.h>
@ -65,6 +66,7 @@ namespace MergeTreeSetting
extern const MergeTreeSettingsUInt64 number_of_free_entries_in_pool_to_execute_mutation;
extern const MergeTreeSettingsUInt64 number_of_free_entries_in_pool_to_lower_max_size_of_merge;
extern const MergeTreeSettingsBool ttl_only_drop_parts;
extern const MergeTreeSettingsMergeSelectorAlgorithm merge_selector_algorithm;
}
namespace ErrorCodes
@ -469,13 +471,17 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
if (metadata_snapshot->hasAnyTTL() && merge_with_ttl_allowed && !ttl_merges_blocker.isCancelled())
{
TTLDeleteMergeSelector::Params params_drop
{
.merge_due_times = next_delete_ttl_merge_times_by_partition,
.current_time = current_time,
.merge_cooldown_time = (*data_settings)[MergeTreeSetting::merge_with_ttl_timeout],
.only_drop_parts = true,
.dry_run = dry_run
};
/// TTL delete is preferred to recompression
TTLDeleteMergeSelector drop_ttl_selector(
next_delete_ttl_merge_times_by_partition,
current_time,
(*data_settings)[MergeTreeSetting::merge_with_ttl_timeout],
/*only_drop_parts*/ true,
dry_run);
TTLDeleteMergeSelector drop_ttl_selector(params_drop);
/// The size of the completely expired part of TTL drop is not affected by the merge pressure and the size of the storage space
parts_to_merge = drop_ttl_selector.select(parts_ranges, (*data_settings)[MergeTreeSetting::max_bytes_to_merge_at_max_space_in_pool]);
@ -485,12 +491,15 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
}
else if (!(*data_settings)[MergeTreeSetting::ttl_only_drop_parts])
{
TTLDeleteMergeSelector delete_ttl_selector(
next_delete_ttl_merge_times_by_partition,
current_time,
(*data_settings)[MergeTreeSetting::merge_with_ttl_timeout],
/*only_drop_parts*/ false,
dry_run);
TTLDeleteMergeSelector::Params params_delete
{
.merge_due_times = next_delete_ttl_merge_times_by_partition,
.current_time = current_time,
.merge_cooldown_time = (*data_settings)[MergeTreeSetting::merge_with_ttl_timeout],
.only_drop_parts = false,
.dry_run = dry_run
};
TTLDeleteMergeSelector delete_ttl_selector(params_delete);
parts_to_merge = delete_ttl_selector.select(parts_ranges, max_total_size_to_merge);
if (!parts_to_merge.empty())
@ -499,12 +508,16 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
if (parts_to_merge.empty() && metadata_snapshot->hasAnyRecompressionTTL())
{
TTLRecompressMergeSelector recompress_ttl_selector(
next_recompress_ttl_merge_times_by_partition,
current_time,
(*data_settings)[MergeTreeSetting::merge_with_recompression_ttl_timeout],
metadata_snapshot->getRecompressionTTLs(),
dry_run);
TTLRecompressMergeSelector::Params params
{
.merge_due_times = next_recompress_ttl_merge_times_by_partition,
.current_time = current_time,
.merge_cooldown_time = (*data_settings)[MergeTreeSetting::merge_with_recompression_ttl_timeout],
.recompression_ttls = metadata_snapshot->getRecompressionTTLs(),
.dry_run = dry_run,
};
TTLRecompressMergeSelector recompress_ttl_selector(params);
parts_to_merge = recompress_ttl_selector.select(parts_ranges, max_total_size_to_merge);
if (!parts_to_merge.empty())
@ -514,17 +527,23 @@ SelectPartsDecision MergeTreeDataMergerMutator::selectPartsToMergeFromRanges(
if (parts_to_merge.empty())
{
SimpleMergeSelector::Settings merge_settings;
/// Override value from table settings
merge_settings.max_parts_to_merge_at_once = (*data_settings)[MergeTreeSetting::max_parts_to_merge_at_once];
if (!(*data_settings)[MergeTreeSetting::min_age_to_force_merge_on_partition_only])
merge_settings.min_age_to_force_merge = (*data_settings)[MergeTreeSetting::min_age_to_force_merge_seconds];
auto merge_selector_algorithm = (*data_settings)[MergeTreeSetting::merge_selector_algorithm];
std::any merge_settings;
if (merge_selector_algorithm == MergeSelectorAlgorithm::SIMPLE)
{
SimpleMergeSelector::Settings simple_merge_settings;
/// Override value from table settings
simple_merge_settings.max_parts_to_merge_at_once = (*data_settings)[MergeTreeSetting::max_parts_to_merge_at_once];
if (!(*data_settings)[MergeTreeSetting::min_age_to_force_merge_on_partition_only])
simple_merge_settings.min_age_to_force_merge = (*data_settings)[MergeTreeSetting::min_age_to_force_merge_seconds];
if (aggressive)
merge_settings.base = 1;
if (aggressive)
simple_merge_settings.base = 1;
parts_to_merge = SimpleMergeSelector(merge_settings)
.select(parts_ranges, max_total_size_to_merge);
merge_settings = simple_merge_settings;
}
parts_to_merge = MergeSelectorFactory::instance().get(merge_selector_algorithm, merge_settings)->select(parts_ranges, max_total_size_to_merge);
/// Do not allow to "merge" part with itself for regular merges, unless it is a TTL-merge where it is ok to remove some values with expired ttl
if (parts_to_merge.size() == 1)

View File

@ -7,7 +7,7 @@
#include <Common/ActionBlocker.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MutationCommands.h>
#include <Storages/MergeTree/TTLMergeSelector.h>
#include <Storages/MergeTree/MergeSelectors/TTLMergeSelector.h>
#include <Storages/MergeTree/MergeAlgorithm.h>
#include <Storages/MergeTree/MergeType.h>
#include <Storages/MergeTree/MergeTask.h>

View File

@ -1,5 +1,6 @@
#include <Core/BaseSettings.h>
#include <Core/BaseSettingsProgramOptions.h>
#include <Core/MergeSelectorAlgorithm.h>
#include <Core/SettingsChangesHistory.h>
#include <Disks/DiskFomAST.h>
#include <Parsers/ASTCreateQuery.h>
@ -92,6 +93,7 @@ namespace ErrorCodes
M(String, merge_workload, "", "Name of workload to be used to access resources for merges", 0) \
M(String, mutation_workload, "", "Name of workload to be used to access resources for mutations", 0) \
M(Milliseconds, background_task_preferred_step_execution_time_ms, 50, "Target time to execution of one step of merge or mutation. Can be exceeded if one step takes longer time", 0) \
M(MergeSelectorAlgorithm, merge_selector_algorithm, MergeSelectorAlgorithm::SIMPLE, "The algorithm to select parts for merges assignment", 0) \
\
/** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \

View File

@ -42,6 +42,7 @@ struct MutableColumnsAndConstraints;
M(CLASS_NAME, Int64) \
M(CLASS_NAME, LightweightMutationProjectionMode) \
M(CLASS_NAME, MaxThreads) \
M(CLASS_NAME, MergeSelectorAlgorithm) \
M(CLASS_NAME, Milliseconds) \
M(CLASS_NAME, Seconds) \
M(CLASS_NAME, String) \

View File

@ -3,6 +3,7 @@
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/extractZooKeeperPathFromReplicatedTableDef.h>
#include <Storages/MergeTree/MergeSelectors/registerMergeSelectors.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
@ -850,6 +851,9 @@ static StoragePtr create(const StorageFactory::Arguments & args)
void registerStorageMergeTree(StorageFactory & factory)
{
/// Part of MergeTree
registerMergeSelectors();
StorageFactory::StorageFeatures features{
.supports_settings = true,
.supports_skipping_indices = true,

View File

@ -1,7 +1,7 @@
#include <iostream>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <Storages/MergeTree/SimpleMergeSelector.h>
#include <Storages/MergeTree/MergeSelectors/SimpleMergeSelector.h>
/** This program tests merge-selecting algorithm.

View File

@ -2,7 +2,7 @@
#include <iostream>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/Operators.h>
#include <Storages/MergeTree/SimpleMergeSelector.h>
#include <Storages/MergeTree/MergeSelectors/SimpleMergeSelector.h>
#include <Common/formatReadable.h>