diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp index 11039edc057..ed6f353c219 100644 --- a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp +++ b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp @@ -13,7 +13,6 @@ BackgroundJobAssignee::BackgroundJobAssignee(MergeTreeData & data_, BackgroundJo , data(data_) , sleep_settings(global_context_->getBackgroundMoveTaskSchedulingSettings()) , rng(randomSeed()) - , storage_id(data.getStorageID()) , type(type_) { } @@ -107,6 +106,7 @@ void BackgroundJobAssignee::finish() holder->deactivate(); auto context = getContext(); + auto storage_id = data.getStorageID(); context->getMovesExecutor()->removeTasksCorrespondingToStorage(storage_id); context->getFetchesExecutor()->removeTasksCorrespondingToStorage(storage_id); diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.h b/src/Storages/MergeTree/BackgroundJobsExecutor.h index d89d8721697..aba30aa06e7 100644 --- a/src/Storages/MergeTree/BackgroundJobsExecutor.h +++ b/src/Storages/MergeTree/BackgroundJobsExecutor.h @@ -48,9 +48,6 @@ private: /// Mutex for thread safety std::mutex holder_mutex; - /// Save storage id to prevent use-after-free in destructor - StorageID storage_id; - public: enum class Type { diff --git a/src/Storages/MergeTree/MergeMutateExecutor.cpp b/src/Storages/MergeTree/MergeMutateExecutor.cpp index cf7283d9c2b..e28c5b7f867 100644 --- a/src/Storages/MergeTree/MergeMutateExecutor.cpp +++ b/src/Storages/MergeTree/MergeMutateExecutor.cpp @@ -6,12 +6,45 @@ namespace DB { +void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id) +{ + std::lock_guard remove_lock(remove_mutex); + + /// First stop the scheduler thread + { + std::unique_lock lock(mutex); + shutdown_suspend = true; + has_tasks.notify_one(); + } + + scheduler.join(); + + /// Remove tasks + { + std::lock_guard lock(currently_executing_mutex); + + for (auto & [task, future] : currently_executing) + { + if (task->getStorageID() == id) + future.wait(); + } + + /// Remove tasks from original queue + size_t erased_count = std::erase_if(tasks, [id = std::move(id)] (auto task) -> bool { return task->getStorageID() == id; }); + CurrentMetrics::sub(metric, erased_count); + } + + shutdown_suspend = false; + scheduler = ThreadFromGlobalPool([this]() { schedulerThreadFunction(); }); +} + void MergeTreeBackgroundExecutor::schedulerThreadFunction() { while (true) { ExecutableTaskPtr current; + auto current_promise = std::make_shared>(); { std::unique_lock lock(mutex); has_tasks.wait(lock, [this](){ return !tasks.empty() || shutdown_suspend; }); @@ -26,11 +59,29 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() updatePoolConfiguration(); } - bool res = pool.trySchedule([this, task = current] () { + std::lock_guard lock(currently_executing_mutex); + currently_executing.emplace(current, current_promise->get_future()); + } + + bool res = pool.trySchedule([this, task = current, promise = current_promise] () mutable + { + auto on_exit = [&] () + { + promise->set_value(); + { + std::lock_guard lock(currently_executing_mutex); + currently_executing.erase(task); + } + }; + + SCOPE_EXIT({ on_exit(); }); + try { - if (task->execute()) + bool result = task->execute(); + + if (result) { std::lock_guard guard(mutex); tasks.emplace_back(task); diff --git a/src/Storages/MergeTree/MergeMutateExecutor.h b/src/Storages/MergeTree/MergeMutateExecutor.h index ec0b71910ef..9f1759ff307 100644 --- a/src/Storages/MergeTree/MergeMutateExecutor.h +++ b/src/Storages/MergeTree/MergeMutateExecutor.h @@ -97,17 +97,7 @@ public: return true; } - void removeTasksCorrespondingToStorage(StorageID id) - { - /// Stop scheduler thread and pool - auto lock = getUniqueLock(); - /// Get lock to the tasks - std::lock_guard second_lock(mutex); - - size_t erased_count = std::erase_if(tasks, [id = std::move(id)] (auto task) -> bool { return task->getStorageID() == id; }); - CurrentMetrics::sub(metric, erased_count); - } - + void removeTasksCorrespondingToStorage(StorageID id); void wait() { @@ -125,48 +115,6 @@ public: private: - using ExecutorSuspender = std::unique_lock; - friend class std::unique_lock; - - ExecutorSuspender getUniqueLock() - { - return ExecutorSuspender(*this); - } - - /// This is needed to achive mutual exclusion - std::mutex lock_mutex; - - void lock() - { - lock_mutex.lock(); - suspend(); - } - - void unlock() - { - resume(); - lock_mutex.unlock(); - } - - void suspend() - { - { - std::unique_lock lock(mutex); - shutdown_suspend = true; - has_tasks.notify_one(); - } - scheduler.join(); - pool.wait(); - } - - - void resume() - { - shutdown_suspend = false; - scheduler = ThreadFromGlobalPool([this]() { schedulerThreadFunction(); }); - } - - void updatePoolConfiguration() { const auto max_threads = threads_count_getter(); @@ -188,8 +136,12 @@ private: CountGetter max_task_count_getter; CurrentMetrics::Metric metric; - using TasksQueue = std::deque; - TasksQueue tasks; + std::deque tasks; + + std::mutex remove_mutex; + + std::mutex currently_executing_mutex; + std::map> currently_executing; std::mutex mutex; std::condition_variable has_tasks; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 30c0f63bde5..4ba7b120fc2 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include