#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { /** * Executor for a background MergeTree related operations such as merges, mutations, fetches an so on. * It can execute only successors of ExecutableTask interface. * Which is a self-written coroutine. It suspends, when returns true from execute() method. * * Executor consists of ThreadPool to execute pieces of a task (basically calls 'execute' on a task) * and a scheduler thread, which manages the tasks. Due to bad experience of working with high memory under * high memory pressure scheduler thread mustn't do any allocations, * because it will be a fatal error if this thread will die from a random exception. * * There are two queues of a tasks: pending (main queue for all the tasks) and active (currently executing). * There is an invariant, that task may occur only in one of these queue. It can occur in both queues only in critical sections. * * Due to all caveats I described above we use boost::circular_buffer as a container for queues. * * Another nuisance that we faces with is than background operations always interact with an associated Storage. * So, when a Storage want to shutdown, it must wait until all its background operaions are finished. */ class MergeTreeBackgroundExecutor : public shared_ptr_helper { public: using CountGetter = std::function; using Callback = std::function; enum class Type { MERGE_MUTATE, FETCH, MOVE }; MergeTreeBackgroundExecutor( Type type_, CountGetter && threads_count_getter_, CountGetter && max_task_count_getter_, CurrentMetrics::Metric metric_) : type(type_) , threads_count_getter(threads_count_getter_) , max_task_count_getter(max_task_count_getter_) , metric(metric_) { name = toString(type); updateConfiguration(); } ~MergeTreeBackgroundExecutor() { wait(); } bool trySchedule(ExecutableTaskPtr task); void removeTasksCorrespondingToStorage(StorageID id); void wait(); size_t activeCount() { std::lock_guard lock(mutex); return active.size(); } size_t pendingCount() { std::lock_guard lock(mutex); return pending.size(); } private: void updateConfiguration(); static String toString(Type type); Type type; String name; CountGetter threads_count_getter; CountGetter max_task_count_getter; CurrentMetrics::Metric metric; size_t threads_count{0}; size_t max_tasks_count{0}; AtomicStopwatch update_timer; struct Item { explicit Item(ExecutableTaskPtr && task_, CurrentMetrics::Metric metric_) : task(std::move(task_)) , increment(std::move(metric_)) { } ExecutableTaskPtr task; CurrentMetrics::Increment increment; std::atomic_bool is_currently_deleting{false}; Poco::Event is_done; }; using ItemPtr = std::shared_ptr; void routine(ItemPtr item); void threadFunction(size_t number); /// Initially it will be empty boost::circular_buffer pending{0}; boost::circular_buffer active{0}; std::mutex mutex; std::condition_variable has_tasks; std::atomic_bool shutdown{false}; ThreadPool pool; }; }