diff --git a/src/Storages/MergeTree/MergeMutateExecutor.cpp b/src/Storages/MergeTree/MergeMutateExecutor.cpp index 01f1f7cbcc7..1cf84333dbe 100644 --- a/src/Storages/MergeTree/MergeMutateExecutor.cpp +++ b/src/Storages/MergeTree/MergeMutateExecutor.cpp @@ -11,32 +11,33 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id { std::lock_guard remove_lock(remove_mutex); - /// First stop the scheduler thread + std::vector tasks_to_wait; { - std::unique_lock lock(mutex); - shutdown_suspend = true; - has_tasks.notify_one(); + std::lock_guard lock(mutex); + + /// Mark this StorageID as deleting + currently_deleting.emplace(id); + + std::erase_if(pending, [&] (auto item) -> bool { return item->task->getStorageID() == id; }); + + /// Find pending to wait + for (auto & item : active) + if (item->task->getStorageID() == id) + tasks_to_wait.emplace_back(item); } - scheduler.join(); - /// Remove tasks + for (auto & item : tasks_to_wait) { - std::lock_guard lock(currently_executing_mutex); - - for (auto & [task, future] : currently_executing) - { - if (task->getStorageID() == id) - future.wait(); - } - - /// Remove tasks from original queue - size_t erased_count = std::erase_if(tasks, [id = std::move(id)] (auto task) -> bool { return task->getStorageID() == id; }); - CurrentMetrics::sub(metric, erased_count); + assert(item->future.valid()); + item->future.wait(); } - shutdown_suspend = false; - scheduler = ThreadFromGlobalPool([this]() { schedulerThreadFunction(); }); + + { + std::lock_guard lock(mutex); + currently_deleting.erase(id); + } } @@ -44,47 +45,54 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() { while (true) { - ItemPtr item; - { - std::unique_lock lock(mutex); - has_tasks.wait(lock, [this](){ return !tasks.empty() || shutdown_suspend; }); + std::unique_lock lock(mutex); - if (shutdown_suspend) - break; + has_tasks.wait(lock, [this](){ return !pending.empty() || shutdown_suspend; }); - item = std::move(tasks.front()); - tasks.pop_front(); + if (shutdown_suspend) + break; - /// This is needed to increase / decrease the number of threads at runtime - updatePoolConfiguration(); - } + auto item = std::move(pending.front()); + pending.pop_front(); - { - std::lock_guard lock(currently_executing_mutex); - currently_executing.emplace(item); - } + active.emplace(item); + + /// This is needed to increase / decrease the number of threads at runtime + updatePoolConfiguration(); bool res = pool.trySchedule([this, item] () { - auto on_exit = [&] () + auto check_if_deleting = [&] () -> bool { - item->promise.set_value(); + active.erase(item); + + for (auto & id : currently_deleting) { - std::lock_guard lock(currently_executing_mutex); - currently_executing.erase(item); + if (item->task->getStorageID() == id) + { + item->promise.set_value(); + return true; + } } + + return false; }; - SCOPE_EXIT({ on_exit(); }); + SCOPE_EXIT({ + std::lock_guard guard(mutex); + check_if_deleting(); + }); try { - bool result = item->task->execute(); - - if (result) + if (item->task->execute()) { std::lock_guard guard(mutex); - tasks.emplace_back(item); + + if (check_if_deleting()) + return; + + pending.emplace_back(item); has_tasks.notify_one(); return; } @@ -101,13 +109,15 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() has_tasks.notify_one(); tryLogCurrentException(__PRETTY_FUNCTION__); } + }); if (!res) { - std::lock_guard guard(mutex); - tasks.emplace_back(item); + active.erase(item); + pending.emplace_back(item); } + } } diff --git a/src/Storages/MergeTree/MergeMutateExecutor.h b/src/Storages/MergeTree/MergeMutateExecutor.h index 504b6e829f8..09de42ccbaf 100644 --- a/src/Storages/MergeTree/MergeMutateExecutor.h +++ b/src/Storages/MergeTree/MergeMutateExecutor.h @@ -90,7 +90,7 @@ public: if (value.load() >= static_cast(max_task_count_getter())) return false; - tasks.emplace_back(std::make_shared(std::move(task), metric)); + pending.emplace_back(std::make_shared(std::move(task), metric)); has_tasks.notify_one(); return true; } @@ -111,15 +111,16 @@ public: pool.wait(); } - size_t active() - { - return pool.active(); - } - - size_t pending() + size_t activeCount() { std::lock_guard lock(mutex); - return tasks.size(); + return active.size(); + } + + size_t pendingCount() + { + std::lock_guard lock(mutex); + return pending.size(); } private: @@ -141,24 +142,27 @@ private: struct Item { - explicit Item(ExecutableTaskPtr && task_, CurrentMetrics::Metric && metric_) - : task(std::move(task_)), increment(std::move(metric_)) {} + explicit Item(ExecutableTaskPtr && task_, CurrentMetrics::Metric metric_) + : task(std::move(task_)) + , increment(std::move(metric_)) + , future(promise.get_future()) + { + } ExecutableTaskPtr task; CurrentMetrics::Increment increment; std::promise promise; + std::future future; }; using ItemPtr = std::shared_ptr; - std::deque tasks; + std::deque pending; + std::set active; + std::set currently_deleting; std::mutex remove_mutex; - - std::mutex currently_executing_mutex; - std::set currently_executing; - std::mutex mutex; std::condition_variable has_tasks; diff --git a/src/Storages/MergeTree/tests/gtest_executor.cpp b/src/Storages/MergeTree/tests/gtest_executor.cpp index 6811c77cb1d..931d8ac8bef 100644 --- a/src/Storages/MergeTree/tests/gtest_executor.cpp +++ b/src/Storages/MergeTree/tests/gtest_executor.cpp @@ -1,7 +1,9 @@ #include #include +#include #include +#include #include #include @@ -13,16 +15,24 @@ namespace CurrentMetrics extern const Metric BackgroundPoolTask; } +std::random_device device; + class FakeExecutableTask : public ExecutableTask { public: - explicit FakeExecutableTask(String name_, std::function on_completed_) : name(name_), on_completed(on_completed_) + explicit FakeExecutableTask(String name_) : generator(device()), distribution(0, 5), name(name_) { } bool execute() override { - std::this_thread::sleep_for(std::chrono::milliseconds(5)); + auto sleep_time = distribution(generator); + std::this_thread::sleep_for(std::chrono::milliseconds(5 * sleep_time)); + + auto choice = distribution(generator); + if (choice == 0) + throw std::runtime_error("Unlucky..."); + return false; } @@ -31,55 +41,106 @@ public: return {"test", name}; } - void onCompleted() override - { - on_completed(); - } + void onCompleted() override {} private: + std::mt19937 generator; + std::uniform_int_distribution<> distribution; String name; std::function on_completed; }; -TEST(Executor, Simple) +TEST(Executor, RemoveTasks) { auto executor = DB::MergeTreeBackgroundExecutor::create(); const size_t tasks_kinds = 25; const size_t batch = 100; - executor->setThreadsCount([]() { return 25; }); + executor->setThreadsCount([]() { return tasks_kinds; }); executor->setTasksCount([] () { return tasks_kinds * batch; }); executor->setMetric(CurrentMetrics::BackgroundPoolTask); for (size_t i = 0; i < batch; ++i) - { for (size_t j = 0; j < tasks_kinds; ++j) - { - bool res = executor->trySchedule(std::make_shared(std::to_string(j), [](){})); - ASSERT_TRUE(res); - } - } + ASSERT_TRUE( + executor->trySchedule(std::make_shared(std::to_string(j))) + ); std::vector threads(batch); - for (auto & thread : threads) - thread = std::thread([&] () - { - for (size_t j = 0; j < tasks_kinds; ++j) - executor->removeTasksCorrespondingToStorage({"test", std::to_string(j)}); + auto remover_routine = [&] () + { + for (size_t j = 0; j < tasks_kinds; ++j) + executor->removeTasksCorrespondingToStorage({"test", std::to_string(j)}); + }; - }); + for (auto & thread : threads) + thread = std::thread(remover_routine); for (auto & thread : threads) thread.join(); - ASSERT_EQ(executor->active(), 0); - ASSERT_EQ(executor->pending(), 0); + ASSERT_EQ(executor->activeCount(), 0); + ASSERT_EQ(executor->pendingCount(), 0); + ASSERT_EQ(CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask], 0); + + executor->wait(); +} + + +TEST(Executor, RemoveTasksStress) +{ + auto executor = DB::MergeTreeBackgroundExecutor::create(); + + const size_t tasks_kinds = 25; + const size_t batch = 100; + const size_t schedulers_count = 5; + const size_t removers_count = 5; + + executor->setThreadsCount([]() { return tasks_kinds; }); + executor->setTasksCount([] () { return tasks_kinds * batch * (schedulers_count + removers_count); }); + executor->setMetric(CurrentMetrics::BackgroundPoolTask); + + std::barrier barrier(schedulers_count + removers_count); + + auto scheduler_routine = [&] () + { + barrier.arrive_and_wait(); + for (size_t i = 0; i < batch; ++i) + for (size_t j = 0; j < tasks_kinds; ++j) + executor->trySchedule(std::make_shared(std::to_string(j))); + }; + + auto remover_routine = [&] () + { + barrier.arrive_and_wait(); + for (size_t j = 0; j < tasks_kinds; ++j) + executor->removeTasksCorrespondingToStorage({"test", std::to_string(j)}); + }; + + std::vector schedulers(schedulers_count); + for (auto & scheduler : schedulers) + scheduler = std::thread(scheduler_routine); + + std::vector removers(removers_count); + for (auto & remover : removers) + remover = std::thread(remover_routine); + + for (auto & scheduler : schedulers) + scheduler.join(); + + for (auto & remover : removers) + remover.join(); + + for (size_t j = 0; j < tasks_kinds; ++j) + executor->removeTasksCorrespondingToStorage({"test", std::to_string(j)}); + + ASSERT_EQ(executor->activeCount(), 0); + ASSERT_EQ(executor->pendingCount(), 0); ASSERT_EQ(CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask], 0); executor->wait(); - }