add config option to select scheduling policy

This commit is contained in:
serxa 2023-02-11 16:18:42 +00:00
parent fbedf4d520
commit c58b165b0f
8 changed files with 181 additions and 15 deletions

View File

@ -1010,6 +1010,24 @@ Default value: 2.
<background_merges_mutations_concurrency_ratio>3</background_merges_mutations_concurrency_ratio>
```
## background_merges_mutations_scheduling_policy {#background_merges_mutations_scheduling_policy}
Algorithm used to select next merge or mutation to be executed by background thread pool. Policy may be changed at runtime without server restart.
Could be applied from the `default` profile for backward compatibility.
Possible values:
- "round_robin" — Every concurrent merge and mutation is executed in round-robin order to ensure starvation-free operation. Smaller merges are completed faster than bigger ones just because they have fewer blocks to merge.
- "shortest_task_first" — Always execute smaller merge or mutation. Merges and mutations are assigned priorities based on their resulting size. Merges with smaller sizes are strictly preferred over bigger ones. This policy ensures the fastest possible merge of small parts but can lead to indefinite starvation of big merges in partitions heavily overloaded by INSERTs.
Default value: "round_robin".
**Example**
```xml
<background_merges_mutations_scheduling_policy>shortest_task_first</background_merges_mutations_scheduling_policy>
```
## background_move_pool_size {#background_move_pool_size}
Sets the number of threads performing background moves for tables with MergeTree engines. Could be increased at runtime and could be applied at server startup from the `default` profile for backward compatibility.

View File

@ -1282,6 +1282,8 @@ try
auto new_pool_size = config->getUInt64("background_pool_size", 16);
auto new_ratio = config->getUInt64("background_merges_mutations_concurrency_ratio", 2);
global_context->getMergeMutateExecutor()->increaseThreadsAndMaxTasksCount(new_pool_size, new_pool_size * new_ratio);
auto new_scheduling_policy = config->getString("background_merges_mutations_scheduling_policy", "round_robin");
global_context->getMergeMutateExecutor()->updateSchedulingPolicy(new_scheduling_policy);
}
if (global_context->areBackgroundExecutorsInitialized() && config->has("background_move_pool_size"))

View File

@ -339,6 +339,7 @@
<background_buffer_flush_schedule_pool_size>16</background_buffer_flush_schedule_pool_size>
<background_pool_size>16</background_pool_size>
<background_merges_mutations_concurrency_ratio>2</background_merges_mutations_concurrency_ratio>
<background_merges_mutations_scheduling_policy>round_robin</background_merges_mutations_scheduling_policy>
<background_move_pool_size>8</background_move_pool_size>
<background_fetches_pool_size>8</background_fetches_pool_size>
<background_common_pool_size>8</background_common_pool_size>

View File

@ -3746,6 +3746,12 @@ void Context::initializeBackgroundExecutorsIfNeeded()
else if (config.has("profiles.default.background_merges_mutations_concurrency_ratio"))
background_merges_mutations_concurrency_ratio = config.getUInt64("profiles.default.background_merges_mutations_concurrency_ratio");
String background_merges_mutations_scheduling_policy = "round_robin";
if (config.has("background_merges_mutations_scheduling_policy"))
background_merges_mutations_scheduling_policy = config.getString("background_merges_mutations_scheduling_policy");
else if (config.has("profiles.default.background_merges_mutations_scheduling_policy"))
background_merges_mutations_scheduling_policy = config.getString("profiles.default.background_merges_mutations_scheduling_policy");
size_t background_move_pool_size = 8;
if (config.has("background_move_pool_size"))
background_move_pool_size = config.getUInt64("background_move_pool_size");
@ -3772,8 +3778,9 @@ void Context::initializeBackgroundExecutorsIfNeeded()
/*max_tasks_count*/background_pool_size * background_merges_mutations_concurrency_ratio,
CurrentMetrics::BackgroundMergesAndMutationsPoolTask
);
LOG_INFO(shared->log, "Initialized background executor for merges and mutations with num_threads={}, num_tasks={}",
background_pool_size, background_pool_size * background_merges_mutations_concurrency_ratio);
shared->merge_mutate_executor->updateSchedulingPolicy(background_merges_mutations_scheduling_policy);
LOG_INFO(shared->log, "Initialized background executor for merges and mutations with num_threads={}, num_tasks={}, scheduling_policy={}",
background_pool_size, background_pool_size * background_merges_mutations_concurrency_ratio, background_merges_mutations_scheduling_policy);
shared->moves_executor = std::make_shared<OrdinaryBackgroundExecutor>
(

View File

@ -130,13 +130,15 @@ using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
template <class Queue>
class MergeTreeBackgroundExecutor;
/// Concurrent merges are scheduled using `RoundRobinRuntimeQueue` to ensure fair and starvation-free operation.
/// Scheduling policy can be changed using `background_merges_mutations_scheduling_policy` config option.
/// By default concurrent merges are scheduled using "round_robin" to ensure fair and starvation-free operation.
/// Previously in heavily overloaded shards big merges could possibly be starved by smaller
/// merges due to the use of strict priority scheduling `PriorityRuntimeQueue`.
class RoundRobinRuntimeQueue;
using MergeMutateBackgroundExecutor = MergeTreeBackgroundExecutor<RoundRobinRuntimeQueue>;
/// merges due to the use of strict priority scheduling "shortest_task_first".
class DynamicRuntimeQueue;
using MergeMutateBackgroundExecutor = MergeTreeBackgroundExecutor<DynamicRuntimeQueue>;
using MergeMutateBackgroundExecutorPtr = std::shared_ptr<MergeMutateBackgroundExecutor>;
class RoundRobinRuntimeQueue;
using OrdinaryBackgroundExecutor = MergeTreeBackgroundExecutor<RoundRobinRuntimeQueue>;
using OrdinaryBackgroundExecutorPtr = std::shared_ptr<OrdinaryBackgroundExecutor>;
struct PartUUIDs;

View File

@ -270,5 +270,6 @@ void MergeTreeBackgroundExecutor<Queue>::threadFunction()
template class MergeTreeBackgroundExecutor<RoundRobinRuntimeQueue>;
template class MergeTreeBackgroundExecutor<PriorityRuntimeQueue>;
template class MergeTreeBackgroundExecutor<DynamicRuntimeQueue>;
}

View File

@ -6,7 +6,9 @@
#include <future>
#include <condition_variable>
#include <set>
#include <iostream>
#include <variant>
#include <utility>
#include <boost/circular_buffer.hpp>
#include <boost/noncopyable.hpp>
@ -78,7 +80,10 @@ public:
return result;
}
void push(TaskRuntimeDataPtr item) { queue.push_back(std::move(item));}
void push(TaskRuntimeDataPtr item)
{
queue.push_back(std::move(item));
}
void remove(StorageID id)
{
@ -90,6 +95,8 @@ public:
void setCapacity(size_t count) { queue.set_capacity(count); }
bool empty() { return queue.empty(); }
static constexpr std::string_view name = "round_robin";
private:
boost::circular_buffer<TaskRuntimeDataPtr> queue{0};
};
@ -122,10 +129,82 @@ public:
void setCapacity(size_t count) { buffer.reserve(count); }
bool empty() { return buffer.empty(); }
static constexpr std::string_view name = "shortest_task_first";
private:
std::vector<TaskRuntimeDataPtr> buffer;
};
/// Queue that can dynamically change scheduling policy
template <class ... Policies>
class DynamicRuntimeQueueImpl
{
public:
TaskRuntimeDataPtr pop()
{
return std::visit<TaskRuntimeDataPtr>([&] (auto && queue) { return queue.pop(); }, impl);
}
void push(TaskRuntimeDataPtr item)
{
std::visit([&] (auto && queue) { queue.push(std::move(item)); }, impl);
}
void remove(StorageID id)
{
std::visit([&] (auto && queue) { queue.remove(id); }, impl);
}
void setCapacity(size_t count)
{
capacity = count;
std::visit([&] (auto && queue) { queue.setCapacity(count); }, impl);
}
bool empty()
{
return std::visit<bool>([&] (auto && queue) { return queue.empty(); }, impl);
}
// Change policy. It does nothing if new policy is unknown or equals current policy.
void updatePolicy(std::string_view name)
{
// We use this double lambda trick to generate code for all possible pairs of types of old and new queue.
// If types are different it moves tasks from old queue to new one using corresponding pop() and push()
resolve<Policies...>(name, [&] <class NewQueue> (std::in_place_type_t<NewQueue>)
{
std::visit([&] (auto && queue)
{
if constexpr (std::is_same_v<decltype(queue), NewQueue>)
return; // The same policy
NewQueue new_queue;
new_queue.setCapacity(capacity);
while (!queue.empty())
new_queue.push(queue.pop());
impl = std::move(new_queue);
}, impl);
});
}
private:
// Find policy with specified `name` and call `func()` if found.
// Tag `std::in_place_type_t<T>` used to help templated lambda to deduce type T w/o creating its instance
template <class T, class ... Ts, class Func>
void resolve(std::string_view name, Func && func)
{
if (T::name == name)
return func(std::in_place_type<T>);
if constexpr (sizeof...(Ts))
return resolve<Ts...>(name, std::forward<Func>(func));
}
std::variant<Policies...> impl;
size_t capacity;
};
// Avoid typedef and alias to facilitate forward declaration
class DynamicRuntimeQueue : public DynamicRuntimeQueueImpl<RoundRobinRuntimeQueue, PriorityRuntimeQueue> {};
/**
* Executor for a background MergeTree related operations such as merges, mutations, fetches and so on.
* It can execute only successors of ExecutableTask interface.
@ -206,6 +285,14 @@ public:
void removeTasksCorrespondingToStorage(StorageID id);
void wait();
/// Update
void updateSchedulingPolicy(std::string_view new_policy)
requires requires(Queue queue) { queue.updatePolicy(new_policy); } // Because we use explicit template instantiation
{
std::lock_guard lock(mutex);
pending.updatePolicy(new_policy);
}
private:
String name;
size_t threads_count TSA_GUARDED_BY(mutex) = 0;
@ -229,5 +316,6 @@ private:
extern template class MergeTreeBackgroundExecutor<RoundRobinRuntimeQueue>;
extern template class MergeTreeBackgroundExecutor<PriorityRuntimeQueue>;
extern template class MergeTreeBackgroundExecutor<DynamicRuntimeQueue>;
}

View File

@ -9,6 +9,7 @@
#include <Storages/MergeTree/IExecutableTask.h>
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
using namespace DB;
namespace CurrentMetrics
@ -63,10 +64,11 @@ using StepFunc = std::function<void(const String & name, size_t steps_left)>;
class LambdaExecutableTask : public IExecutableTask
{
public:
explicit LambdaExecutableTask(const String & name_, size_t step_count_, StepFunc step_func_ = {})
explicit LambdaExecutableTask(const String & name_, size_t step_count_, StepFunc step_func_ = {}, UInt64 priority_ = 0)
: name(name_)
, step_count(step_count_)
, step_func(step_func_)
, priority(priority_)
{}
bool executeStep() override
@ -82,12 +84,14 @@ public:
}
void onCompleted() override {}
UInt64 getPriority() override { return 0; }
UInt64 getPriority() override { return priority; }
private:
String name;
size_t step_count;
StepFunc step_func;
UInt64 priority;
};
@ -116,11 +120,11 @@ TEST(Executor, Simple)
// This is required to check scheduling properties of round-robin in deterministic way.
auto init_task = [&] (const String &, size_t)
{
executor->trySchedule(std::make_shared<LambdaExecutableTask>("A", 3, task));
executor->trySchedule(std::make_shared<LambdaExecutableTask>("B", 4, task));
executor->trySchedule(std::make_shared<LambdaExecutableTask>("C", 5, task));
executor->trySchedule(std::make_shared<LambdaExecutableTask>("D", 6, task));
executor->trySchedule(std::make_shared<LambdaExecutableTask>("E", 1, task));
executor->trySchedule(std::make_shared<LambdaExecutableTask>("A", 3, task));
executor->trySchedule(std::make_shared<LambdaExecutableTask>("B", 4, task));
executor->trySchedule(std::make_shared<LambdaExecutableTask>("C", 5, task));
executor->trySchedule(std::make_shared<LambdaExecutableTask>("D", 6, task));
executor->trySchedule(std::make_shared<LambdaExecutableTask>("E", 1, task));
};
executor->trySchedule(std::make_shared<LambdaExecutableTask>("init_task", 1, init_task));
@ -222,3 +226,46 @@ TEST(Executor, RemoveTasksStress)
ASSERT_EQ(CurrentMetrics::values[CurrentMetrics::BackgroundMergesAndMutationsPoolTask], 0);
}
TEST(Executor, UpdatePolicy)
{
auto executor = std::make_shared<DB::MergeTreeBackgroundExecutor<DynamicRuntimeQueue>>
(
"GTest",
1, // threads
100, // max_tasks
CurrentMetrics::BackgroundMergesAndMutationsPoolTask
);
String schedule; // mutex is not required because we have a single worker
String expected_schedule = "ABCDEDDDDDCCBACBACB";
std::barrier barrier(2);
auto task = [&] (const String & name, size_t)
{
schedule += name;
if (schedule.size() == 5)
executor->updateSchedulingPolicy(PriorityRuntimeQueue::name);
if (schedule.size() == 12)
executor->updateSchedulingPolicy(RoundRobinRuntimeQueue::name);
if (schedule.size() == expected_schedule.size())
barrier.arrive_and_wait();
};
// Schedule tasks from this `init_task` to guarantee atomicity.
// Worker will see pending queue when we push all tasks.
// This is required to check scheduling properties in a deterministic way.
auto init_task = [&] (const String &, size_t)
{
executor->trySchedule(std::make_shared<LambdaExecutableTask>("A", 3, task, 5));
executor->trySchedule(std::make_shared<LambdaExecutableTask>("B", 4, task, 4));
executor->trySchedule(std::make_shared<LambdaExecutableTask>("C", 5, task, 3));
executor->trySchedule(std::make_shared<LambdaExecutableTask>("D", 6, task, 2));
executor->trySchedule(std::make_shared<LambdaExecutableTask>("E", 1, task, 1));
};
executor->trySchedule(std::make_shared<LambdaExecutableTask>("init_task", 1, init_task));
barrier.arrive_and_wait(); // Do not finish until tasks are done
executor->wait();
ASSERT_EQ(schedule, expected_schedule);
}