ClickHouse/src/Storages/MergeTree/MergeMutateExecutor.h

172 lines
3.5 KiB
C++
Raw Normal View History

2021-08-30 19:37:03 +00:00
#pragma once
#include <deque>
#include <functional>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <common/shared_ptr_helper.h>
#include <Common/ThreadPool.h>
#include <Storages/MergeTree/ExecutableTask.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB
{
class LambdaAdapter : public shared_ptr_helper<LambdaAdapter>, public ExecutableTask
{
public:
template <typename T>
explicit LambdaAdapter(T && inner_, MergeTreeData & data_) : inner(inner_), data(data_) {}
bool execute() override
{
res = inner();
2021-08-31 14:54:24 +00:00
inner = {};
2021-08-30 19:37:03 +00:00
return false;
}
void onCompleted() override
{
data.triggerBackgroundOperationTask(!res);
}
StorageID getStorageID() override
{
return data.getStorageID();
}
private:
bool res = false;
std::function<bool()> inner;
MergeTreeData & data;
};
class MergeTreeBackgroundExecutor : public shared_ptr_helper<MergeTreeBackgroundExecutor>
{
public:
using CountGetter = std::function<size_t()>;
using Callback = std::function<void()>;
MergeTreeBackgroundExecutor()
{
scheduler = ThreadFromGlobalPool([this]() { schedulerThreadFunction(); });
}
~MergeTreeBackgroundExecutor()
{
wait();
}
void setThreadsCount(CountGetter && getter)
{
threads_count_getter = getter;
}
void setTasksCount(CountGetter && getter)
{
max_task_count_getter = getter;
}
void setMetric(CurrentMetrics::Metric metric_)
{
metric = metric_;
}
bool trySchedule(ExecutableTaskPtr task)
{
std::lock_guard lock(mutex);
if (shutdown_suspend)
return false;
auto & value = CurrentMetrics::values[metric];
if (value.load() >= static_cast<int64_t>(max_task_count_getter()))
return false;
2021-08-31 18:07:24 +00:00
tasks.emplace_back(std::make_shared<Item>(std::move(task), metric));
2021-08-30 19:37:03 +00:00
has_tasks.notify_one();
return true;
}
2021-08-31 11:02:39 +00:00
void removeTasksCorrespondingToStorage(StorageID id);
2021-08-30 19:37:03 +00:00
void wait()
{
{
std::lock_guard lock(mutex);
shutdown_suspend = true;
has_tasks.notify_all();
}
if (scheduler.joinable())
scheduler.join();
pool.wait();
}
2021-08-31 14:54:24 +00:00
size_t active()
{
return pool.active();
}
size_t pending()
{
std::lock_guard lock(mutex);
return tasks.size();
}
2021-08-30 19:37:03 +00:00
private:
void updatePoolConfiguration()
{
const auto max_threads = threads_count_getter();
pool.setMaxFreeThreads(0);
pool.setMaxThreads(max_threads);
pool.setQueueSize(max_threads);
}
void schedulerThreadFunction();
CountGetter threads_count_getter;
CountGetter max_task_count_getter;
CurrentMetrics::Metric metric;
2021-08-31 18:07:24 +00:00
struct Item
{
explicit Item(ExecutableTaskPtr && task_, CurrentMetrics::Metric && metric_)
: task(std::move(task_)), increment(std::move(metric_)) {}
ExecutableTaskPtr task;
CurrentMetrics::Increment increment;
std::promise<void> promise;
};
using ItemPtr = std::shared_ptr<Item>;
std::deque<ItemPtr> tasks;
2021-08-31 11:02:39 +00:00
std::mutex remove_mutex;
std::mutex currently_executing_mutex;
2021-08-31 18:07:24 +00:00
std::set<ItemPtr> currently_executing;
2021-08-30 19:37:03 +00:00
std::mutex mutex;
std::condition_variable has_tasks;
std::atomic_bool shutdown_suspend{false};
ThreadPool pool;
ThreadFromGlobalPool scheduler;
};
}