#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { class TaskNotification; /** Executes functions scheduled at a specific point in time. * Basically all tasks are added in a queue and precessed by worker threads. * * The most important difference between this and BackgroundProcessingPool * is that we have the guarantee that the same function is not executed from many workers in the same time. * * The usage scenario: instead starting a separate thread for each task, * register a task in BackgroundSchedulePool and when you need to run the task, * call schedule or scheduleAfter(duration) method. */ class BackgroundSchedulePool { public: class TaskInfo; using TaskInfoPtr = std::shared_ptr; using TaskFunc = std::function; using DelayedTasks = std::multimap; class TaskInfo : public std::enable_shared_from_this, private boost::noncopyable { public: TaskInfo(BackgroundSchedulePool & pool_, const std::string & log_name_, const TaskFunc & function_); /// Schedule for execution as soon as possible (if not already scheduled). /// If the task was already scheduled with delay, the delay will be ignored. bool schedule(); /// Schedule for execution after specified delay. bool scheduleAfter(size_t ms); /// Further attempts to schedule become no-op. Will wait till the end of the current execution of the task. void deactivate(); void activate(); /// Atomically activate task and schedule it for execution. bool activateAndSchedule(); /// get zkutil::WatchCallback needed for notifications from ZooKeeper watches. zkutil::WatchCallback getWatchCallback(); private: friend class TaskNotification; friend class BackgroundSchedulePool; void execute(); void scheduleImpl(std::lock_guard & schedule_mutex_lock); BackgroundSchedulePool & pool; std::string log_name; TaskFunc function; std::mutex exec_mutex; std::mutex schedule_mutex; /// Invariants: /// * If deactivated is true then scheduled, delayed and executing are all false. /// * scheduled and delayed cannot be true at the same time. bool deactivated = false; bool scheduled = false; bool delayed = false; bool executing = false; /// If the task is scheduled with delay, points to element of delayed_tasks. DelayedTasks::iterator iterator; }; class TaskHolder { public: TaskHolder() = default; explicit TaskHolder(const TaskInfoPtr & task_info_) : task_info(task_info_) {} TaskHolder(const TaskHolder & other) = delete; TaskHolder(TaskHolder && other) noexcept = default; TaskHolder & operator=(const TaskHolder & other) noexcept = delete; TaskHolder & operator=(TaskHolder && other) noexcept = default; ~TaskHolder() { if (task_info) task_info->deactivate(); } TaskInfo * operator->() { return task_info.get(); } const TaskInfo * operator->() const { return task_info.get(); } private: TaskInfoPtr task_info; }; TaskHolder createTask(const std::string & log_name, const TaskFunc & function); size_t getNumberOfThreads() const { return size; } BackgroundSchedulePool(size_t size); ~BackgroundSchedulePool(); private: using Threads = std::vector; void threadFunction(); void delayExecutionThreadFunction(); /// Schedule task for execution after specified delay from now. void scheduleDelayedTask(const TaskInfoPtr & task_info, size_t ms, std::lock_guard & task_schedule_mutex_lock); /// Remove task, that was scheduled with delay, from schedule. void cancelDelayedTask(const TaskInfoPtr & task_info, std::lock_guard & task_schedule_mutex_lock); /// Number for worker threads. const size_t size; std::atomic shutdown {false}; Threads threads; Poco::NotificationQueue queue; /// Delayed notifications. std::condition_variable wakeup_cond; std::mutex delayed_tasks_mutex; /// Thread waiting for next delayed task. std::thread delayed_thread; /// Tasks ordered by scheduled time. DelayedTasks delayed_tasks; /// Thread group used for profiling purposes ThreadGroupStatusPtr thread_group; void attachToThreadGroup(); }; using BackgroundSchedulePoolPtr = std::shared_ptr; }