Stress test + reworker executor

This commit is contained in:
Nikita Mikhaylov 2021-08-31 23:20:23 +00:00
parent 28eff0e369
commit ac5316ce98
3 changed files with 158 additions and 83 deletions

View File

@ -11,32 +11,33 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id
{
std::lock_guard remove_lock(remove_mutex);
/// First stop the scheduler thread
std::vector<ItemPtr> tasks_to_wait;
{
std::unique_lock lock(mutex);
shutdown_suspend = true;
has_tasks.notify_one();
std::lock_guard lock(mutex);
/// Mark this StorageID as deleting
currently_deleting.emplace(id);
std::erase_if(pending, [&] (auto item) -> bool { return item->task->getStorageID() == id; });
/// Find pending to wait
for (auto & item : active)
if (item->task->getStorageID() == id)
tasks_to_wait.emplace_back(item);
}
scheduler.join();
/// Remove tasks
for (auto & item : tasks_to_wait)
{
std::lock_guard lock(currently_executing_mutex);
for (auto & [task, future] : currently_executing)
{
if (task->getStorageID() == id)
future.wait();
}
/// Remove tasks from original queue
size_t erased_count = std::erase_if(tasks, [id = std::move(id)] (auto task) -> bool { return task->getStorageID() == id; });
CurrentMetrics::sub(metric, erased_count);
assert(item->future.valid());
item->future.wait();
}
shutdown_suspend = false;
scheduler = ThreadFromGlobalPool([this]() { schedulerThreadFunction(); });
{
std::lock_guard lock(mutex);
currently_deleting.erase(id);
}
}
@ -44,47 +45,54 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
{
while (true)
{
ItemPtr item;
{
std::unique_lock lock(mutex);
has_tasks.wait(lock, [this](){ return !tasks.empty() || shutdown_suspend; });
std::unique_lock lock(mutex);
if (shutdown_suspend)
break;
has_tasks.wait(lock, [this](){ return !pending.empty() || shutdown_suspend; });
item = std::move(tasks.front());
tasks.pop_front();
if (shutdown_suspend)
break;
/// This is needed to increase / decrease the number of threads at runtime
updatePoolConfiguration();
}
auto item = std::move(pending.front());
pending.pop_front();
{
std::lock_guard lock(currently_executing_mutex);
currently_executing.emplace(item);
}
active.emplace(item);
/// This is needed to increase / decrease the number of threads at runtime
updatePoolConfiguration();
bool res = pool.trySchedule([this, item] ()
{
auto on_exit = [&] ()
auto check_if_deleting = [&] () -> bool
{
item->promise.set_value();
active.erase(item);
for (auto & id : currently_deleting)
{
std::lock_guard lock(currently_executing_mutex);
currently_executing.erase(item);
if (item->task->getStorageID() == id)
{
item->promise.set_value();
return true;
}
}
return false;
};
SCOPE_EXIT({ on_exit(); });
SCOPE_EXIT({
std::lock_guard guard(mutex);
check_if_deleting();
});
try
{
bool result = item->task->execute();
if (result)
if (item->task->execute())
{
std::lock_guard guard(mutex);
tasks.emplace_back(item);
if (check_if_deleting())
return;
pending.emplace_back(item);
has_tasks.notify_one();
return;
}
@ -101,13 +109,15 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
has_tasks.notify_one();
tryLogCurrentException(__PRETTY_FUNCTION__);
}
});
if (!res)
{
std::lock_guard guard(mutex);
tasks.emplace_back(item);
active.erase(item);
pending.emplace_back(item);
}
}
}

View File

@ -90,7 +90,7 @@ public:
if (value.load() >= static_cast<int64_t>(max_task_count_getter()))
return false;
tasks.emplace_back(std::make_shared<Item>(std::move(task), metric));
pending.emplace_back(std::make_shared<Item>(std::move(task), metric));
has_tasks.notify_one();
return true;
}
@ -111,15 +111,16 @@ public:
pool.wait();
}
size_t active()
{
return pool.active();
}
size_t pending()
size_t activeCount()
{
std::lock_guard lock(mutex);
return tasks.size();
return active.size();
}
size_t pendingCount()
{
std::lock_guard lock(mutex);
return pending.size();
}
private:
@ -141,24 +142,27 @@ private:
struct Item
{
explicit Item(ExecutableTaskPtr && task_, CurrentMetrics::Metric && metric_)
: task(std::move(task_)), increment(std::move(metric_)) {}
explicit Item(ExecutableTaskPtr && task_, CurrentMetrics::Metric metric_)
: task(std::move(task_))
, increment(std::move(metric_))
, future(promise.get_future())
{
}
ExecutableTaskPtr task;
CurrentMetrics::Increment increment;
std::promise<void> promise;
std::future<void> future;
};
using ItemPtr = std::shared_ptr<Item>;
std::deque<ItemPtr> tasks;
std::deque<ItemPtr> pending;
std::set<ItemPtr> active;
std::set<StorageID> currently_deleting;
std::mutex remove_mutex;
std::mutex currently_executing_mutex;
std::set<ItemPtr> currently_executing;
std::mutex mutex;
std::condition_variable has_tasks;

View File

@ -1,7 +1,9 @@
#include <gtest/gtest.h>
#include <atomic>
#include <barrier>
#include <memory>
#include <random>
#include <Storages/MergeTree/ExecutableTask.h>
#include <Storages/MergeTree/MergeMutateExecutor.h>
@ -13,16 +15,24 @@ namespace CurrentMetrics
extern const Metric BackgroundPoolTask;
}
std::random_device device;
class FakeExecutableTask : public ExecutableTask
{
public:
explicit FakeExecutableTask(String name_, std::function<void()> on_completed_) : name(name_), on_completed(on_completed_)
explicit FakeExecutableTask(String name_) : generator(device()), distribution(0, 5), name(name_)
{
}
bool execute() override
{
std::this_thread::sleep_for(std::chrono::milliseconds(5));
auto sleep_time = distribution(generator);
std::this_thread::sleep_for(std::chrono::milliseconds(5 * sleep_time));
auto choice = distribution(generator);
if (choice == 0)
throw std::runtime_error("Unlucky...");
return false;
}
@ -31,55 +41,106 @@ public:
return {"test", name};
}
void onCompleted() override
{
on_completed();
}
void onCompleted() override {}
private:
std::mt19937 generator;
std::uniform_int_distribution<> distribution;
String name;
std::function<void()> on_completed;
};
TEST(Executor, Simple)
TEST(Executor, RemoveTasks)
{
auto executor = DB::MergeTreeBackgroundExecutor::create();
const size_t tasks_kinds = 25;
const size_t batch = 100;
executor->setThreadsCount([]() { return 25; });
executor->setThreadsCount([]() { return tasks_kinds; });
executor->setTasksCount([] () { return tasks_kinds * batch; });
executor->setMetric(CurrentMetrics::BackgroundPoolTask);
for (size_t i = 0; i < batch; ++i)
{
for (size_t j = 0; j < tasks_kinds; ++j)
{
bool res = executor->trySchedule(std::make_shared<FakeExecutableTask>(std::to_string(j), [](){}));
ASSERT_TRUE(res);
}
}
ASSERT_TRUE(
executor->trySchedule(std::make_shared<FakeExecutableTask>(std::to_string(j)))
);
std::vector<std::thread> threads(batch);
for (auto & thread : threads)
thread = std::thread([&] ()
{
for (size_t j = 0; j < tasks_kinds; ++j)
executor->removeTasksCorrespondingToStorage({"test", std::to_string(j)});
auto remover_routine = [&] ()
{
for (size_t j = 0; j < tasks_kinds; ++j)
executor->removeTasksCorrespondingToStorage({"test", std::to_string(j)});
};
});
for (auto & thread : threads)
thread = std::thread(remover_routine);
for (auto & thread : threads)
thread.join();
ASSERT_EQ(executor->active(), 0);
ASSERT_EQ(executor->pending(), 0);
ASSERT_EQ(executor->activeCount(), 0);
ASSERT_EQ(executor->pendingCount(), 0);
ASSERT_EQ(CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask], 0);
executor->wait();
}
TEST(Executor, RemoveTasksStress)
{
auto executor = DB::MergeTreeBackgroundExecutor::create();
const size_t tasks_kinds = 25;
const size_t batch = 100;
const size_t schedulers_count = 5;
const size_t removers_count = 5;
executor->setThreadsCount([]() { return tasks_kinds; });
executor->setTasksCount([] () { return tasks_kinds * batch * (schedulers_count + removers_count); });
executor->setMetric(CurrentMetrics::BackgroundPoolTask);
std::barrier barrier(schedulers_count + removers_count);
auto scheduler_routine = [&] ()
{
barrier.arrive_and_wait();
for (size_t i = 0; i < batch; ++i)
for (size_t j = 0; j < tasks_kinds; ++j)
executor->trySchedule(std::make_shared<FakeExecutableTask>(std::to_string(j)));
};
auto remover_routine = [&] ()
{
barrier.arrive_and_wait();
for (size_t j = 0; j < tasks_kinds; ++j)
executor->removeTasksCorrespondingToStorage({"test", std::to_string(j)});
};
std::vector<std::thread> schedulers(schedulers_count);
for (auto & scheduler : schedulers)
scheduler = std::thread(scheduler_routine);
std::vector<std::thread> removers(removers_count);
for (auto & remover : removers)
remover = std::thread(remover_routine);
for (auto & scheduler : schedulers)
scheduler.join();
for (auto & remover : removers)
remover.join();
for (size_t j = 0; j < tasks_kinds; ++j)
executor->removeTasksCorrespondingToStorage({"test", std::to_string(j)});
ASSERT_EQ(executor->activeCount(), 0);
ASSERT_EQ(executor->pendingCount(), 0);
ASSERT_EQ(CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask], 0);
executor->wait();
}