This commit is contained in:
Nikita Mikhaylov 2021-08-31 11:02:39 +00:00
parent c4416906c8
commit 1adb9bfe23
5 changed files with 62 additions and 61 deletions

View File

@ -13,7 +13,6 @@ BackgroundJobAssignee::BackgroundJobAssignee(MergeTreeData & data_, BackgroundJo
, data(data_)
, sleep_settings(global_context_->getBackgroundMoveTaskSchedulingSettings())
, rng(randomSeed())
, storage_id(data.getStorageID())
, type(type_)
{
}
@ -107,6 +106,7 @@ void BackgroundJobAssignee::finish()
holder->deactivate();
auto context = getContext();
auto storage_id = data.getStorageID();
context->getMovesExecutor()->removeTasksCorrespondingToStorage(storage_id);
context->getFetchesExecutor()->removeTasksCorrespondingToStorage(storage_id);

View File

@ -48,9 +48,6 @@ private:
/// Mutex for thread safety
std::mutex holder_mutex;
/// Save storage id to prevent use-after-free in destructor
StorageID storage_id;
public:
enum class Type
{

View File

@ -6,12 +6,45 @@
namespace DB
{
void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id)
{
std::lock_guard remove_lock(remove_mutex);
/// First stop the scheduler thread
{
std::unique_lock lock(mutex);
shutdown_suspend = true;
has_tasks.notify_one();
}
scheduler.join();
/// Remove tasks
{
std::lock_guard lock(currently_executing_mutex);
for (auto & [task, future] : currently_executing)
{
if (task->getStorageID() == id)
future.wait();
}
/// Remove tasks from original queue
size_t erased_count = std::erase_if(tasks, [id = std::move(id)] (auto task) -> bool { return task->getStorageID() == id; });
CurrentMetrics::sub(metric, erased_count);
}
shutdown_suspend = false;
scheduler = ThreadFromGlobalPool([this]() { schedulerThreadFunction(); });
}
void MergeTreeBackgroundExecutor::schedulerThreadFunction()
{
while (true)
{
ExecutableTaskPtr current;
auto current_promise = std::make_shared<std::promise<void>>();
{
std::unique_lock lock(mutex);
has_tasks.wait(lock, [this](){ return !tasks.empty() || shutdown_suspend; });
@ -26,11 +59,29 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
updatePoolConfiguration();
}
bool res = pool.trySchedule([this, task = current] ()
{
std::lock_guard lock(currently_executing_mutex);
currently_executing.emplace(current, current_promise->get_future());
}
bool res = pool.trySchedule([this, task = current, promise = current_promise] () mutable
{
auto on_exit = [&] ()
{
promise->set_value();
{
std::lock_guard lock(currently_executing_mutex);
currently_executing.erase(task);
}
};
SCOPE_EXIT({ on_exit(); });
try
{
if (task->execute())
bool result = task->execute();
if (result)
{
std::lock_guard guard(mutex);
tasks.emplace_back(task);

View File

@ -97,17 +97,7 @@ public:
return true;
}
void removeTasksCorrespondingToStorage(StorageID id)
{
/// Stop scheduler thread and pool
auto lock = getUniqueLock();
/// Get lock to the tasks
std::lock_guard second_lock(mutex);
size_t erased_count = std::erase_if(tasks, [id = std::move(id)] (auto task) -> bool { return task->getStorageID() == id; });
CurrentMetrics::sub(metric, erased_count);
}
void removeTasksCorrespondingToStorage(StorageID id);
void wait()
{
@ -125,48 +115,6 @@ public:
private:
using ExecutorSuspender = std::unique_lock<MergeTreeBackgroundExecutor>;
friend class std::unique_lock<MergeTreeBackgroundExecutor>;
ExecutorSuspender getUniqueLock()
{
return ExecutorSuspender(*this);
}
/// This is needed to achive mutual exclusion
std::mutex lock_mutex;
void lock()
{
lock_mutex.lock();
suspend();
}
void unlock()
{
resume();
lock_mutex.unlock();
}
void suspend()
{
{
std::unique_lock lock(mutex);
shutdown_suspend = true;
has_tasks.notify_one();
}
scheduler.join();
pool.wait();
}
void resume()
{
shutdown_suspend = false;
scheduler = ThreadFromGlobalPool([this]() { schedulerThreadFunction(); });
}
void updatePoolConfiguration()
{
const auto max_threads = threads_count_getter();
@ -188,8 +136,12 @@ private:
CountGetter max_task_count_getter;
CurrentMetrics::Metric metric;
using TasksQueue = std::deque<ExecutableTaskPtr>;
TasksQueue tasks;
std::deque<ExecutableTaskPtr> tasks;
std::mutex remove_mutex;
std::mutex currently_executing_mutex;
std::map<ExecutableTaskPtr, std::future<void>> currently_executing;
std::mutex mutex;
std::condition_variable has_tasks;

View File

@ -17,6 +17,7 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MergeMutateExecutor.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Storages/MergeTree/PartitionPruner.h>