#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { class BackgroundProcessingPool; class BackgroundProcessingPoolTaskInfo; enum class BackgroundProcessingPoolTaskResult { SUCCESS, ERROR, NOTHING_TO_DO, }; /** Using a fixed number of threads, perform an arbitrary number of tasks in an infinite loop. * In this case, one task can run simultaneously from different threads. * Designed for tasks that perform continuous background work (for example, merge). * `Task` is a function that returns a bool - did it do any work. * If not, then the next time will be done later. */ class BackgroundProcessingPool { public: /// Returns true, if some useful work was done. In that case, thread will not sleep before next run of this task. using TaskResult = BackgroundProcessingPoolTaskResult; using Task = std::function; using TaskInfo = BackgroundProcessingPoolTaskInfo; using TaskHandle = std::shared_ptr; BackgroundProcessingPool(int size_); size_t getNumberOfThreads() const { return size; } TaskHandle addTask(const Task & task); void removeTask(const TaskHandle & task); ~BackgroundProcessingPool(); protected: friend class BackgroundProcessingPoolTaskInfo; using Tasks = std::multimap; /// key is desired next time to execute (priority). using Threads = std::vector; const size_t size; Tasks tasks; /// Ordered in priority. std::mutex tasks_mutex; Threads threads; std::atomic shutdown {false}; std::condition_variable wake_event; /// Thread group used for profiling purposes ThreadGroupStatusPtr thread_group; void threadFunction(); }; using BackgroundProcessingPoolPtr = std::shared_ptr; class BackgroundProcessingPoolTaskInfo { public: /// Wake up any thread. void wake(); BackgroundProcessingPoolTaskInfo(BackgroundProcessingPool & pool_, const BackgroundProcessingPool::Task & function_) : pool(pool_), function(function_) {} protected: friend class BackgroundProcessingPool; BackgroundProcessingPool & pool; BackgroundProcessingPool::Task function; /// Read lock is hold when task is executed. std::shared_mutex rwlock; std::atomic removed {false}; std::multimap>::iterator iterator; /// For exponential backoff. size_t count_no_work_done = 0; }; }