ClickHouse/src/Storages/MergeTree/MergeMutateExecutor.h

176 lines
3.6 KiB
C++
Raw Normal View History

2021-08-30 19:37:03 +00:00
#pragma once
#include <deque>
#include <functional>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <common/shared_ptr_helper.h>
#include <Common/ThreadPool.h>
#include <Storages/MergeTree/ExecutableTask.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB
{
class LambdaAdapter : public shared_ptr_helper<LambdaAdapter>, public ExecutableTask
{
public:
template <typename T>
explicit LambdaAdapter(T && inner_, MergeTreeData & data_) : inner(inner_), data(data_) {}
bool execute() override
{
res = inner();
2021-08-31 14:54:24 +00:00
inner = {};
2021-08-30 19:37:03 +00:00
return false;
}
void onCompleted() override
{
data.triggerBackgroundOperationTask(!res);
}
StorageID getStorageID() override
{
return data.getStorageID();
}
private:
bool res = false;
std::function<bool()> inner;
MergeTreeData & data;
};
class MergeTreeBackgroundExecutor : public shared_ptr_helper<MergeTreeBackgroundExecutor>
{
public:
using CountGetter = std::function<size_t()>;
using Callback = std::function<void()>;
MergeTreeBackgroundExecutor()
{
scheduler = ThreadFromGlobalPool([this]() { schedulerThreadFunction(); });
}
~MergeTreeBackgroundExecutor()
{
wait();
}
void setThreadsCount(CountGetter && getter)
{
threads_count_getter = getter;
}
void setTasksCount(CountGetter && getter)
{
max_task_count_getter = getter;
}
void setMetric(CurrentMetrics::Metric metric_)
{
metric = metric_;
}
bool trySchedule(ExecutableTaskPtr task)
{
std::lock_guard lock(mutex);
if (shutdown_suspend)
return false;
auto & value = CurrentMetrics::values[metric];
if (value.load() >= static_cast<int64_t>(max_task_count_getter()))
return false;
2021-08-31 23:20:23 +00:00
pending.emplace_back(std::make_shared<Item>(std::move(task), metric));
2021-08-30 19:37:03 +00:00
has_tasks.notify_one();
return true;
}
2021-08-31 11:02:39 +00:00
void removeTasksCorrespondingToStorage(StorageID id);
2021-08-30 19:37:03 +00:00
void wait()
{
{
std::lock_guard lock(mutex);
shutdown_suspend = true;
has_tasks.notify_all();
}
if (scheduler.joinable())
scheduler.join();
pool.wait();
}
2021-08-31 23:20:23 +00:00
size_t activeCount()
2021-08-31 14:54:24 +00:00
{
2021-08-31 23:20:23 +00:00
std::lock_guard lock(mutex);
return active.size();
2021-08-31 14:54:24 +00:00
}
2021-08-31 23:20:23 +00:00
size_t pendingCount()
2021-08-31 14:54:24 +00:00
{
std::lock_guard lock(mutex);
2021-08-31 23:20:23 +00:00
return pending.size();
2021-08-31 14:54:24 +00:00
}
2021-08-30 19:37:03 +00:00
private:
void updatePoolConfiguration()
{
const auto max_threads = threads_count_getter();
pool.setMaxFreeThreads(0);
pool.setMaxThreads(max_threads);
pool.setQueueSize(max_threads);
}
void schedulerThreadFunction();
CountGetter threads_count_getter;
CountGetter max_task_count_getter;
CurrentMetrics::Metric metric;
2021-08-31 18:07:24 +00:00
struct Item
{
2021-08-31 23:20:23 +00:00
explicit Item(ExecutableTaskPtr && task_, CurrentMetrics::Metric metric_)
: task(std::move(task_))
, increment(std::move(metric_))
, future(promise.get_future())
{
}
2021-08-31 18:07:24 +00:00
ExecutableTaskPtr task;
CurrentMetrics::Increment increment;
std::promise<void> promise;
2021-08-31 23:20:23 +00:00
std::future<void> future;
2021-08-31 18:07:24 +00:00
};
using ItemPtr = std::shared_ptr<Item>;
2021-08-31 23:20:23 +00:00
std::deque<ItemPtr> pending;
std::set<ItemPtr> active;
std::set<StorageID> currently_deleting;
2021-08-31 11:02:39 +00:00
std::mutex remove_mutex;
2021-08-30 19:37:03 +00:00
std::mutex mutex;
std::condition_variable has_tasks;
std::atomic_bool shutdown_suspend{false};
ThreadPool pool;
ThreadFromGlobalPool scheduler;
};
}