add merge_workload and mutation_workload settings for server and merge tree

This commit is contained in:
serxa 2024-05-17 17:10:40 +00:00
parent ad5403a034
commit 96e19ac548
11 changed files with 71 additions and 8 deletions

View File

@ -1557,6 +1557,10 @@ try
0, // We don't need any threads one all the parts will be deleted
new_server_settings.max_parts_cleaning_thread_pool_size);
global_context->setMergeWorkload(new_server_settings.merge_workload);
global_context->setMutationWorkload(new_server_settings.mutation_workload);
if (config->has("resources"))
{
global_context->getResourceManager()->updateConfiguration(*config);

View File

@ -1396,6 +1396,14 @@
<!-- <host_name>replica</host_name> -->
</distributed_ddl>
<!-- Used to regulate how resources are utilized and shared between merges, mutations and other workloads.
Specified value is used as `workload` setting value for background merge or mutation.
-->
<!--
<merge_workload>merges_and_mutations</merge_workload>
<mutation_workload>merges_and_mutations</mutation_workload>
-->
<!-- Settings to fine-tune MergeTree tables. See documentation in source code, in MergeTreeSettings.h -->
<!--
<merge_tree>

View File

@ -142,6 +142,8 @@ namespace DB
M(UInt64, global_profiler_real_time_period_ns, 0, "Period for real clock timer of global profiler (in nanoseconds). Set 0 value to turn off the real clock global profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(UInt64, global_profiler_cpu_time_period_ns, 0, "Period for CPU clock timer of global profiler (in nanoseconds). Set 0 value to turn off the CPU clock global profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(Bool, enable_azure_sdk_logging, false, "Enables logging from Azure sdk", 0) \
M(String, merge_workload, "default", "Name of workload to be used to access resources for all merges (may be overriden by a merge tree setting)", 0) \
M(String, mutation_workload, "default", "Name of workload to be used to access resources for all mutations (may be overriden by a merge tree setting)", 0) \
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in StorageSystemServerSettings.cpp

View File

@ -277,6 +277,8 @@ struct ContextSharedPart : boost::noncopyable
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying
String merge_workload TSA_GUARDED_BY(mutex); /// Workload setting value that is used by all merges
String mutation_workload TSA_GUARDED_BY(mutex); /// Workload setting value that is used by all mutations
std::unique_ptr<AccessControl> access_control TSA_GUARDED_BY(mutex);
mutable OnceFlag resource_manager_initialized;
mutable ResourceManagerPtr resource_manager;
@ -1543,11 +1545,36 @@ ResourceManagerPtr Context::getResourceManager() const
ClassifierPtr Context::getWorkloadClassifier() const
{
std::lock_guard lock(mutex);
// NOTE: Workload cannot be changed after query start, and getWorkloadClassifier() should not be called before proper `workload` is set
if (!classifier)
classifier = getResourceManager()->acquire(getSettingsRef().workload);
return classifier;
}
String Context::getMergeWorkload() const
{
SharedLockGuard lock(shared->mutex);
return shared->merge_workload;
}
void Context::setMergeWorkload(const String & value)
{
std::lock_guard lock(shared->mutex);
shared->merge_workload = value;
}
String Context::getMutationWorkload() const
{
SharedLockGuard lock(shared->mutex);
return shared->mutation_workload;
}
void Context::setMutationWorkload(const String & value)
{
std::lock_guard lock(shared->mutex);
shared->mutation_workload = value;
}
Scalars Context::getScalars() const
{
@ -2471,6 +2498,20 @@ void Context::makeQueryContext()
backups_query_throttler.reset();
}
void Context::makeQueryContextForMerge(const MergeTreeSettings & merge_tree_settings)
{
makeQueryContext();
classifier.reset(); // It is assumed that there are no active queries running using this classifier, otherwise this will lead to crashes
settings.workload = merge_tree_settings.merge_workload.value.empty() ? getMergeWorkload() : merge_tree_settings.merge_workload;
}
void Context::makeQueryContextForMutate(const MergeTreeSettings & merge_tree_settings)
{
makeQueryContext();
classifier.reset(); // It is assumed that there are no active queries running using this classifier, otherwise this will lead to crashes
settings.workload = merge_tree_settings.mutation_workload.value.empty() ? getMutationWorkload() : merge_tree_settings.mutation_workload;
}
void Context::makeSessionContext()
{
session_context = shared_from_this();

View File

@ -622,6 +622,10 @@ public:
/// Resource management related
ResourceManagerPtr getResourceManager() const;
ClassifierPtr getWorkloadClassifier() const;
String getMergeWorkload() const;
void setMergeWorkload(const String & value);
String getMutationWorkload() const;
void setMutationWorkload(const String & value);
/// We have to copy external tables inside executeQuery() to track limits. Therefore, set callback for it. Must set once.
void setExternalTablesInitializer(ExternalTablesInitializer && initializer);
@ -896,6 +900,8 @@ public:
void setSessionContext(ContextMutablePtr context_) { session_context = context_; }
void makeQueryContext();
void makeQueryContextForMerge(const MergeTreeSettings & merge_tree_settings);
void makeQueryContextForMutate(const MergeTreeSettings & merge_tree_settings);
void makeSessionContext();
void makeGlobalContext();

View File

@ -310,7 +310,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
auto table_id = storage.getStorageID();
task_context = Context::createCopy(storage.getContext());
task_context->makeQueryContext();
task_context->makeQueryContextForMerge(*storage.getSettings());
task_context->setCurrentQueryId(getQueryId());
/// Add merge to list

View File

@ -165,7 +165,7 @@ void MergePlainMergeTreeTask::finish()
ContextMutablePtr MergePlainMergeTreeTask::createTaskContext() const
{
auto context = Context::createCopy(storage.getContext());
context->makeQueryContext();
context->makeQueryContextForMerge(*storage.getSettings());
auto queryId = getQueryId();
context->setCurrentQueryId(queryId);
return context;

View File

@ -137,7 +137,7 @@ private:
virtual ~IStage() = default;
};
/// By default this context is uninitialed, but some variables has to be set after construction,
/// By default this context is uninitialized, but some variables has to be set after construction,
/// some variables are used in a process of execution
/// Proper initialization is responsibility of the author
struct GlobalRuntimeContext : public IStageRuntimeContext
@ -199,7 +199,7 @@ private:
using GlobalRuntimeContextPtr = std::shared_ptr<GlobalRuntimeContext>;
/// By default this context is uninitialed, but some variables has to be set after construction,
/// By default this context is uninitialized, but some variables has to be set after construction,
/// some variables are used in a process of execution
/// Proper initialization is responsibility of the author
struct ExecuteAndFinalizeHorizontalPartRuntimeContext : public IStageRuntimeContext
@ -272,7 +272,7 @@ private:
GlobalRuntimeContextPtr global_ctx;
};
/// By default this context is uninitialed, but some variables has to be set after construction,
/// By default this context is uninitialized, but some variables has to be set after construction,
/// some variables are used in a process of execution
/// Proper initialization is responsibility of the author
struct VerticalMergeRuntimeContext : public IStageRuntimeContext
@ -344,7 +344,7 @@ private:
GlobalRuntimeContextPtr global_ctx;
};
/// By default this context is uninitialed, but some variables has to be set after construction,
/// By default this context is uninitialized, but some variables has to be set after construction,
/// some variables are used in a process of execution
/// Proper initialization is responsibility of the author
struct MergeProjectionsRuntimeContext : public IStageRuntimeContext

View File

@ -81,6 +81,8 @@ struct Settings;
M(UInt64, min_delay_to_mutate_ms, 10, "Min delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(UInt64, max_delay_to_mutate_ms, 1000, "Max delay of mutating MergeTree table in milliseconds, if there are a lot of unfinished mutations", 0) \
M(Bool, exclude_deleted_rows_for_part_size_in_merge, false, "Use an estimated source part size (excluding lightweight deleted rows) when selecting parts to merge", 0) \
M(String, merge_workload, "", "Name of workload to be used to access resources for merges", 0) \
M(String, mutation_workload, "", "Name of workload to be used to access resources for mutations", 0) \
\
/** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 1000, "If table contains at least that many active parts in single partition, artificially slow down insert into table. Disabled if set to 0", 0) \

View File

@ -204,7 +204,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()
}
task_context = Context::createCopy(storage.getContext());
task_context->makeQueryContext();
task_context->makeQueryContextForMutate(*storage.getSettings());
task_context->setCurrentQueryId(getQueryId());
merge_mutate_entry = storage.getContext()->getMergeList().insert(

View File

@ -136,7 +136,7 @@ bool MutatePlainMergeTreeTask::executeStep()
ContextMutablePtr MutatePlainMergeTreeTask::createTaskContext() const
{
auto context = Context::createCopy(storage.getContext());
context->makeQueryContext();
context->makeQueryContextForMutate(*storage.getSettings());
auto queryId = getQueryId();
context->setCurrentQueryId(queryId);
return context;