diff --git a/src/Common/AsyncLoader.cpp b/src/Common/AsyncLoader.cpp index 6c19b1910eb..b5612517cd6 100644 --- a/src/Common/AsyncLoader.cpp +++ b/src/Common/AsyncLoader.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB { @@ -41,9 +42,14 @@ std::exception_ptr LoadJob::exception() const return load_exception; } -ssize_t LoadJob::priority() const +size_t LoadJob::executionPool() const { - return load_priority; + return execution_pool_id; +} + +size_t LoadJob::pool() const +{ + return pool_id; } void LoadJob::wait() const @@ -112,8 +118,9 @@ void LoadJob::enqueued() enqueue_time = std::chrono::system_clock::now(); } -void LoadJob::execute(const LoadJobPtr & self) +void LoadJob::execute(size_t pool, const LoadJobPtr & self) { + execution_pool_id = pool; start_time = std::chrono::system_clock::now(); func(self); } @@ -148,22 +155,35 @@ void LoadTask::remove() { loader.remove(jobs); jobs.clear(); + goal_jobs.clear(); } } void LoadTask::detach() { jobs.clear(); + goal_jobs.clear(); } -AsyncLoader::AsyncLoader(Metric metric_threads, Metric metric_active_threads, size_t max_threads_, bool log_failures_, bool log_progress_) + +AsyncLoader::AsyncLoader(std::vector pool_initializers, bool log_failures_, bool log_progress_) : log_failures(log_failures_) , log_progress(log_progress_) , log(&Poco::Logger::get("AsyncLoader")) - , max_threads(max_threads_) - , pool(metric_threads, metric_active_threads, max_threads) { - + pools.reserve(pool_initializers.size()); + for (auto && init : pool_initializers) + pools.push_back({ + .name = init.name, + .priority = init.priority, + .thread_pool = std::make_unique( + init.metric_threads, + init.metric_active_threads, + init.max_threads, + /* max_free_threads = */ 0, + init.max_threads), + .max_threads = init.max_threads + }); } AsyncLoader::~AsyncLoader() @@ -175,13 +195,20 @@ void AsyncLoader::start() { std::unique_lock lock{mutex}; is_running = true; - for (size_t i = 0; workers < max_threads && i < ready_queue.size(); i++) - spawn(lock); + updateCurrentPriorityAndSpawn(lock); } void AsyncLoader::wait() { - pool.wait(); + // Because job can create new jobs in other pools we have to recheck in cycle + std::unique_lock lock{mutex}; + while (!scheduled_jobs.empty()) + { + lock.unlock(); + for (auto & p : pools) + p.thread_pool->wait(); + lock.lock(); + } } void AsyncLoader::stop() @@ -191,7 +218,7 @@ void AsyncLoader::stop() is_running = false; // NOTE: there is no need to notify because workers never wait } - pool.wait(); + wait(); } void AsyncLoader::schedule(LoadTask & task) @@ -229,9 +256,9 @@ void AsyncLoader::scheduleImpl(const LoadJobSet & input_jobs) old_jobs = finished_jobs.size(); } - // Make set of jobs to schedule: + // Pass 1. Make set of jobs to schedule: // 1) exclude already scheduled or finished jobs - // 2) include pending dependencies, that are not yet scheduled + // 2) include assigned job dependencies (that are not yet scheduled) LoadJobSet jobs; for (const auto & job : input_jobs) gatherNotScheduled(job, jobs, lock); @@ -242,17 +269,18 @@ void AsyncLoader::scheduleImpl(const LoadJobSet & input_jobs) // We do not want any exception to be throws after this point, because the following code is not exception-safe DENY_ALLOCATIONS_IN_SCOPE; - // Schedule all incoming jobs + // Pass 2. Schedule all incoming jobs for (const auto & job : jobs) { + chassert(job->pool() < pools.size()); NOEXCEPT_SCOPE({ ALLOW_ALLOCATIONS_IN_SCOPE; - scheduled_jobs.emplace(job, Info{.initial_priority = job->load_priority, .priority = job->load_priority}); + scheduled_jobs.try_emplace(job); job->scheduled(); }); } - // Process dependencies on scheduled pending jobs + // Pass 3. Process dependencies on scheduled jobs, priority inheritance for (const auto & job : jobs) { Info & info = scheduled_jobs.find(job)->second; @@ -267,17 +295,18 @@ void AsyncLoader::scheduleImpl(const LoadJobSet & input_jobs) }); info.dependencies_left++; - // Priority inheritance: prioritize deps to have at least given `priority` to avoid priority inversion - prioritize(dep, info.priority, lock); + // Priority inheritance: prioritize deps to have at least given `pool.priority` to avoid priority inversion + prioritize(dep, job->pool_id, lock); } } // Enqueue non-blocked jobs (w/o dependencies) to ready queue - if (!info.is_blocked()) + if (!info.isBlocked()) enqueue(info, job, lock); } - // Process dependencies on other jobs. It is done in a separate pass to facilitate propagation of cancel signals (if any). + // Pass 4: Process dependencies on other jobs. + // It is done in a separate pass to facilitate cancelling due to already failed dependencies. for (const auto & job : jobs) { if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end()) @@ -285,12 +314,12 @@ void AsyncLoader::scheduleImpl(const LoadJobSet & input_jobs) for (const auto & dep : job->dependencies) { if (scheduled_jobs.contains(dep)) - continue; // Skip dependencies on scheduled pending jobs (already processed) + continue; // Skip dependencies on scheduled jobs (already processed in pass 3) LoadStatus dep_status = dep->status(); if (dep_status == LoadStatus::OK) continue; // Dependency on already successfully finished job -- it's okay. - // Dependency on not scheduled pending job -- it's bad. + // Dependency on assigned job -- it's bad. // Probably, there is an error in `jobs` set, `gatherNotScheduled()` should have fixed it. chassert(dep_status != LoadStatus::PENDING); @@ -305,7 +334,7 @@ void AsyncLoader::scheduleImpl(const LoadJobSet & input_jobs) job->name, getExceptionMessage(dep->exception(), /* with_stacktrace = */ false))); }); - finish(lock, job, LoadStatus::CANCELED, e); + finish(job, LoadStatus::CANCELED, e, lock); break; // This job is now finished, stop its dependencies processing } } @@ -327,13 +356,14 @@ void AsyncLoader::gatherNotScheduled(const LoadJobPtr & job, LoadJobSet & jobs, } } -void AsyncLoader::prioritize(const LoadJobPtr & job, ssize_t new_priority) +void AsyncLoader::prioritize(const LoadJobPtr & job, size_t new_pool) { if (!job) return; + chassert(new_pool < pools.size()); DENY_ALLOCATIONS_IN_SCOPE; std::unique_lock lock{mutex}; - prioritize(job, new_priority, lock); + prioritize(job, new_pool, lock); } void AsyncLoader::remove(const LoadJobSet & jobs) @@ -347,14 +377,14 @@ void AsyncLoader::remove(const LoadJobSet & jobs) { if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end()) { - if (info->second.is_executing()) + if (info->second.isExecuting()) continue; // Skip executing jobs on the first pass std::exception_ptr e; NOEXCEPT_SCOPE({ ALLOW_ALLOCATIONS_IN_SCOPE; e = std::make_exception_ptr(Exception(ErrorCodes::ASYNC_LOAD_CANCELED, "Load job '{}' canceled", job->name)); }); - finish(lock, job, LoadStatus::CANCELED, e); + finish(job, LoadStatus::CANCELED, e, lock); } } // On the second pass wait for executing jobs to finish @@ -363,7 +393,7 @@ void AsyncLoader::remove(const LoadJobSet & jobs) if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end()) { // Job is currently executing - chassert(info->second.is_executing()); + chassert(info->second.isExecuting()); lock.unlock(); job->waitNoThrow(); // Wait for job to finish lock.lock(); @@ -379,25 +409,36 @@ void AsyncLoader::remove(const LoadJobSet & jobs) } } -void AsyncLoader::setMaxThreads(size_t value) +void AsyncLoader::setMaxThreads(size_t pool, size_t value) { std::unique_lock lock{mutex}; - pool.setMaxThreads(value); - pool.setMaxFreeThreads(value); - pool.setQueueSize(value); - max_threads = value; + auto & p = pools[pool]; + p.thread_pool->setMaxThreads(value); + p.thread_pool->setQueueSize(value); // Keep queue size equal max threads count to avoid blocking during spawning + p.max_threads = value; if (!is_running) return; - for (size_t i = 0; workers < max_threads && i < ready_queue.size(); i++) - spawn(lock); + for (size_t i = 0; canSpawnWorker(p, lock) && i < p.ready_queue.size(); i++) + spawn(p, lock); } -size_t AsyncLoader::getMaxThreads() const +size_t AsyncLoader::getMaxThreads(size_t pool) const { std::unique_lock lock{mutex}; - return max_threads; + return pools[pool].max_threads; } +const String & AsyncLoader::getPoolName(size_t pool) const +{ + return pools[pool].name; // NOTE: lock is not needed because `name` is const and `pools` are immutable +} + +ssize_t AsyncLoader::getPoolPriority(size_t pool) const +{ + return pools[pool].priority; // NOTE: lock is not needed because `priority` is const and `pools` are immutable +} + + size_t AsyncLoader::getScheduledJobCount() const { std::unique_lock lock{mutex}; @@ -412,11 +453,10 @@ std::vector AsyncLoader::getJobStates() const states.emplace(job->name, JobState{ .job = job, .dependencies_left = info.dependencies_left, - .is_executing = info.is_executing(), - .is_blocked = info.is_blocked(), - .is_ready = info.is_ready(), - .initial_priority = info.initial_priority, - .ready_seqno = last_ready_seqno + .ready_seqno = info.ready_seqno, + .is_blocked = info.isBlocked(), + .is_ready = info.isReady(), + .is_executing = info.isExecuting() }); for (const auto & job : finished_jobs) states.emplace(job->name, JobState{.job = job}); @@ -462,21 +502,21 @@ String AsyncLoader::checkCycleImpl(const LoadJobPtr & job, LoadJobSet & left, Lo return {}; } -void AsyncLoader::finish(std::unique_lock & lock, const LoadJobPtr & job, LoadStatus status, std::exception_ptr exception_from_job) +void AsyncLoader::finish(const LoadJobPtr & job, LoadStatus status, std::exception_ptr exception_from_job, std::unique_lock & lock) { + chassert(scheduled_jobs.contains(job)); // Job was pending if (status == LoadStatus::OK) { // Notify waiters job->ok(); // Update dependent jobs and enqueue if ready - chassert(scheduled_jobs.contains(job)); // Job was pending for (const auto & dep : scheduled_jobs[job].dependent_jobs) { chassert(scheduled_jobs.contains(dep)); // All depended jobs must be pending Info & dep_info = scheduled_jobs[dep]; dep_info.dependencies_left--; - if (!dep_info.is_blocked()) + if (!dep_info.isBlocked()) enqueue(dep_info, dep, lock); } } @@ -488,11 +528,10 @@ void AsyncLoader::finish(std::unique_lock & lock, const LoadJobPtr & else if (status == LoadStatus::CANCELED) job->canceled(exception_from_job); - chassert(scheduled_jobs.contains(job)); // Job was pending Info & info = scheduled_jobs[job]; - if (info.is_ready()) + if (info.isReady()) { - ready_queue.erase(info.key()); + pools[job->pool_id].ready_queue.erase(info.ready_seqno); info.ready_seqno = 0; } @@ -512,7 +551,7 @@ void AsyncLoader::finish(std::unique_lock & lock, const LoadJobPtr & dep->name, getExceptionMessage(exception_from_job, /* with_stacktrace = */ false))); }); - finish(lock, dep, LoadStatus::CANCELED, e); + finish(dep, LoadStatus::CANCELED, e, lock); } // Clean dependency graph edges pointing to canceled jobs @@ -531,87 +570,130 @@ void AsyncLoader::finish(std::unique_lock & lock, const LoadJobPtr & }); } -void AsyncLoader::prioritize(const LoadJobPtr & job, ssize_t new_priority, std::unique_lock & lock) +void AsyncLoader::prioritize(const LoadJobPtr & job, size_t new_pool_id, std::unique_lock & lock) { if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end()) { - if (info->second.priority >= new_priority) - return; // Never lower priority + Pool & old_pool = pools[job->pool_id]; + Pool & new_pool = pools[new_pool_id]; + if (old_pool.priority >= new_pool.priority) + return; // Never lower priority or change pool leaving the same priority // Update priority and push job forward through ready queue if needed - if (info->second.ready_seqno) - ready_queue.erase(info->second.key()); - info->second.priority = new_priority; - job->load_priority.store(new_priority); // Set user-facing priority (may affect executing jobs) - if (info->second.ready_seqno) + UInt64 ready_seqno = info->second.ready_seqno; + + // Requeue job into the new pool queue without allocations + if (ready_seqno) { - NOEXCEPT_SCOPE({ - ALLOW_ALLOCATIONS_IN_SCOPE; - ready_queue.emplace(info->second.key(), job); - }); + new_pool.ready_queue.insert(old_pool.ready_queue.extract(ready_seqno)); + if (canSpawnWorker(new_pool, lock)) + spawn(new_pool, lock); } + // Set user-facing pool and priority (may affect executing jobs) + job->pool_id.store(new_pool_id); + // Recurse into dependencies for (const auto & dep : job->dependencies) - prioritize(dep, new_priority, lock); + prioritize(dep, new_pool_id, lock); } } void AsyncLoader::enqueue(Info & info, const LoadJobPtr & job, std::unique_lock & lock) { - chassert(!info.is_blocked()); + chassert(!info.isBlocked()); chassert(info.ready_seqno == 0); info.ready_seqno = ++last_ready_seqno; + Pool & pool = pools[job->pool_id]; NOEXCEPT_SCOPE({ ALLOW_ALLOCATIONS_IN_SCOPE; - ready_queue.emplace(info.key(), job); + pool.ready_queue.emplace(info.ready_seqno, job); }); job->enqueued(); - if (is_running && workers < max_threads) - spawn(lock); + if (canSpawnWorker(pool, lock)) + spawn(pool, lock); } -void AsyncLoader::spawn(std::unique_lock &) +bool AsyncLoader::canSpawnWorker(Pool & pool, std::unique_lock &) { - workers++; + return is_running + && !pool.ready_queue.empty() + && pool.workers < pool.max_threads + && (!current_priority || *current_priority <= pool.priority); +} + +bool AsyncLoader::canWorkerLive(Pool & pool, std::unique_lock &) +{ + return is_running + && !pool.ready_queue.empty() + && pool.workers <= pool.max_threads + && (!current_priority || *current_priority <= pool.priority); +} + +void AsyncLoader::updateCurrentPriorityAndSpawn(std::unique_lock & lock) +{ + // Find current priority. + // NOTE: We assume low number of pools, so O(N) scans are fine. + std::optional priority; + for (Pool & pool : pools) + { + if (pool.isActive() && (!priority || *priority < pool.priority)) + priority = pool.priority; + } + current_priority = priority; + + // Spawn workers in all pools with current priority + for (Pool & pool : pools) + { + for (size_t i = 0; canSpawnWorker(pool, lock) && i < pool.ready_queue.size(); i++) + spawn(pool, lock); + } +} + +void AsyncLoader::spawn(Pool & pool, std::unique_lock &) +{ + pool.workers++; + current_priority = pool.priority; // canSpawnWorker() ensures this would not decrease current_priority NOEXCEPT_SCOPE({ ALLOW_ALLOCATIONS_IN_SCOPE; - pool.scheduleOrThrowOnError([this] { worker(); }); + pool.thread_pool->scheduleOrThrowOnError([this, &pool] { worker(pool); }); }); } -void AsyncLoader::worker() +void AsyncLoader::worker(Pool & pool) { DENY_ALLOCATIONS_IN_SCOPE; + size_t pool_id = &pool - &*pools.begin(); LoadJobPtr job; std::exception_ptr exception_from_job; while (true) { // This is inside the loop to also reset previous thread names set inside the jobs - setThreadName("AsyncLoader"); + setThreadName(pool.name.c_str()); { std::unique_lock lock{mutex}; // Handle just executed job if (exception_from_job) - finish(lock, job, LoadStatus::FAILED, exception_from_job); + finish(job, LoadStatus::FAILED, exception_from_job, lock); else if (job) - finish(lock, job, LoadStatus::OK); + finish(job, LoadStatus::OK, {}, lock); - if (!is_running || ready_queue.empty() || workers > max_threads) + if (!canWorkerLive(pool, lock)) { - workers--; + if (--pool.workers == 0) + updateCurrentPriorityAndSpawn(lock); // It will spawn lower priority workers if needed return; } // Take next job to be executed from the ready queue - auto it = ready_queue.begin(); + auto it = pool.ready_queue.begin(); job = it->second; - ready_queue.erase(it); + pool.ready_queue.erase(it); scheduled_jobs.find(job)->second.ready_seqno = 0; // This job is no longer in the ready queue } @@ -619,7 +701,7 @@ void AsyncLoader::worker() try { - job->execute(job); + job->execute(pool_id, job); exception_from_job = {}; } catch (...) diff --git a/src/Common/AsyncLoader.h b/src/Common/AsyncLoader.h index d4a3218a541..15f7ae9722b 100644 --- a/src/Common/AsyncLoader.h +++ b/src/Common/AsyncLoader.h @@ -12,7 +12,7 @@ #include #include #include -#include +#include namespace Poco { class Logger; } @@ -46,22 +46,28 @@ class LoadJob : private boost::noncopyable { public: template - LoadJob(LoadJobSetType && dependencies_, String name_, Func && func_, ssize_t priority_ = 0) + LoadJob(LoadJobSetType && dependencies_, String name_, size_t pool_id_, Func && func_) : dependencies(std::forward(dependencies_)) , name(std::move(name_)) + , pool_id(pool_id_) , func(std::forward(func_)) - , load_priority(priority_) {} // Current job status. LoadStatus status() const; std::exception_ptr exception() const; - // Returns current value of a priority of the job. May differ from initial priority. - ssize_t priority() const; + // Returns pool in which the job is executing (was executed). May differ from initial pool and from current pool. + // Value is only valid (and constant) after execution started. + size_t executionPool() const; + + // Returns current pool of the job. May differ from initial and execution pool. + // This value is intended for creating new jobs during this job execution. + // Value may change during job execution by `prioritize()`. + size_t pool() const; // Sync wait for a pending job to be finished: OK, FAILED or CANCELED status. - // Throws if job is FAILED or CANCELED. Returns or throws immediately on non-pending job. + // Throws if job is FAILED or CANCELED. Returns or throws immediately if called on non-pending job. void wait() const; // Wait for a job to reach any non PENDING status. @@ -90,10 +96,11 @@ private: void scheduled(); void enqueued(); - void execute(const LoadJobPtr & self); + void execute(size_t pool, const LoadJobPtr & self); + std::atomic execution_pool_id; + std::atomic pool_id; std::function func; - std::atomic load_priority; mutable std::mutex mutex; mutable std::condition_variable finished; @@ -115,25 +122,25 @@ struct EmptyJobFunc template LoadJobPtr makeLoadJob(LoadJobSet && dependencies, String name, Func && func = EmptyJobFunc()) { - return std::make_shared(std::move(dependencies), std::move(name), std::forward(func)); + return std::make_shared(std::move(dependencies), std::move(name), 0, std::forward(func)); } template LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, String name, Func && func = EmptyJobFunc()) { - return std::make_shared(dependencies, std::move(name), std::forward(func)); + return std::make_shared(dependencies, std::move(name), 0, std::forward(func)); } template -LoadJobPtr makeLoadJob(LoadJobSet && dependencies, ssize_t priority, String name, Func && func = EmptyJobFunc()) +LoadJobPtr makeLoadJob(LoadJobSet && dependencies, size_t pool_id, String name, Func && func = EmptyJobFunc()) { - return std::make_shared(std::move(dependencies), std::move(name), std::forward(func), priority); + return std::make_shared(std::move(dependencies), std::move(name), pool_id, std::forward(func)); } template -LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, ssize_t priority, String name, Func && func = EmptyJobFunc()) +LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, size_t pool_id, String name, Func && func = EmptyJobFunc()) { - return std::make_shared(dependencies, std::move(name), std::forward(func), priority); + return std::make_shared(dependencies, std::move(name), pool_id, std::forward(func)); } // Represents a logically connected set of LoadJobs required to achieve some goals (final LoadJob in the set). @@ -185,7 +192,7 @@ inline void scheduleLoad(const LoadTaskPtrs & tasks) } template -inline void scheduleLoad(Args && ... args) +inline void scheduleLoadAll(Args && ... args) { (scheduleLoad(std::forward(args)), ...); } @@ -208,16 +215,16 @@ inline void waitLoad(const LoadTaskPtrs & tasks) } template -inline void waitLoad(Args && ... args) +inline void waitLoadAll(Args && ... args) { (waitLoad(std::forward(args)), ...); } template -inline void scheduleAndWaitLoad(Args && ... args) +inline void scheduleAndWaitLoadAll(Args && ... args) { - scheduleLoad(std::forward(args)...); - waitLoad(std::forward(args)...); + scheduleLoadAll(std::forward(args)...); + waitLoadAll(std::forward(args)...); } inline LoadJobSet getGoals(const LoadTaskPtrs & tasks) @@ -228,6 +235,14 @@ inline LoadJobSet getGoals(const LoadTaskPtrs & tasks) return result; } +inline LoadJobSet getGoalsOr(const LoadTaskPtrs & tasks, const LoadJobSet & alternative) +{ + LoadJobSet result; + for (const auto & task : tasks) + result.insert(task->goals().begin(), task->goals().end()); + return result.empty() ? alternative : result; +} + inline LoadJobSet joinJobs(const LoadJobSet & jobs1, const LoadJobSet & jobs2) { LoadJobSet result; @@ -251,100 +266,117 @@ inline LoadTaskPtrs joinTasks(const LoadTaskPtrs & tasks1, const LoadTaskPtrs & return result; } -// `AsyncLoader` is a scheduler for DAG of `LoadJob`s. It tracks dependencies and priorities of jobs. +// `AsyncLoader` is a scheduler for DAG of `LoadJob`s. It tracks job dependencies and priorities. // Basic usage example: +// // Start async_loader with two thread pools (0=bg, 1=fg): +// AsyncLoader async_loader({ +// {"BgPool", CurrentMetrics::AsyncLoaderThreads, CurrentMetrics::AsyncLoaderThreadsActive, .max_threads = 1, .priority = 0} +// {"FgPool", CurrentMetrics::AsyncLoaderThreads, CurrentMetrics::AsyncLoaderThreadsActive, .max_threads = 2, .priority = 1} +// }); +// +// // Create and schedule a task consisting of three jobs. Job1 has no dependencies and is run first. +// // Job2 and job3 depend on job1 and are run only after job1 completion. // auto job_func = [&] (const LoadJobPtr & self) { -// LOG_TRACE(log, "Executing load job '{}' with priority '{}'", self->name, self->priority()); +// LOG_TRACE(log, "Executing load job '{}' in pool '{}'", self->name, async_loader->getPoolName(self->pool())); // }; -// auto job1 = makeLoadJob({}, "job1", job_func); -// auto job2 = makeLoadJob({ job1 }, "job2", job_func); -// auto job3 = makeLoadJob({ job1 }, "job3", job_func); +// auto job1 = makeLoadJob({}, "job1", /* pool_id = */ 0, job_func); +// auto job2 = makeLoadJob({ job1 }, "job2", /* pool_id = */ 0, job_func); +// auto job3 = makeLoadJob({ job1 }, "job3", /* pool_id = */ 0, job_func); // auto task = makeLoadTask(async_loader, { job1, job2, job3 }); // task.schedule(); -// Here we have created and scheduled a task consisting of three jobs. Job1 has no dependencies and is run first. -// Job2 and job3 depend on job1 and are run only after job1 completion. Another thread may prioritize a job and wait for it: -// async_loader->prioritize(job3, /* priority = */ 1); // higher priority jobs are run first, default priority is zero. +// +// // Another thread may prioritize a job by changing its pool and wait for it: +// async_loader->prioritize(job3, /* pool_id = */ 1); // higher priority jobs are run first, default priority is zero. // job3->wait(); // blocks until job completion or cancellation and rethrow an exception (if any) // -// AsyncLoader tracks state of all scheduled jobs. Job lifecycle is the following: -// 1) Job is constructed with PENDING status and initial priority. The job is placed into a task. -// 2) The task is scheduled with all its jobs and their dependencies. A scheduled job may be ready (i.e. have all its dependencies finished) or blocked. -// 3a) When all dependencies are successfully executed, the job became ready. A ready job is enqueued into the ready queue. +// Every job has a pool associated with it. AsyncLoader starts every job in its thread pool. +// Each pool has a constant priority and a mutable maximum number of threads. +// Higher priority (greater `pool.priority` value) jobs are run first. +// No job with lower priority is started while there is at least one higher priority job ready or running. +// +// Job priority can be elevated (but cannot be lowered) +// (a) if either it has a dependent job with higher priority: +// in this case the priority and the pool of a dependent job is inherited during `schedule()` call; +// (b) or job was explicitly prioritized by `prioritize(job, higher_priority_pool)` call: +// this also leads to a priority inheritance for all the dependencies. +// Value stored in load job `pool_id` field is atomic and can be changed even during job execution. +// Job is, of course, not moved from its initial thread pool, but it should use `self->pool()` for +// all new jobs it create to avoid priority inversion. +// +// === IMPLEMENTATION DETAILS === +// All possible states and statuses of a job: +// .---------- scheduled ----------. +// ctor --> assigned --> blocked --> ready --> executing --> finished ------> removed --> dtor +// STATUS: '------------------ PENDING -----------------' '-- OK|FAILED|CANCELED --' +// +// AsyncLoader tracks state of all scheduled and finished jobs. Job lifecycle is the following: +// 1) A job is constructed with PENDING status and assigned to a pool. The job is placed into a task. +// 2) The task is scheduled with all its jobs and their dependencies. A scheduled job may be ready, blocked (and later executing). +// 3a) When all dependencies are successfully finished, the job became ready. A ready job is enqueued into the ready queue of its pool. // 3b) If at least one of the job dependencies is failed or canceled, then this job is canceled (with all it's dependent jobs as well). // On cancellation an ASYNC_LOAD_CANCELED exception is generated and saved inside LoadJob object. The job status is changed to CANCELED. // Exception is rethrown by any existing or new `wait()` call. The job is moved to the set of the finished jobs. -// 4) The scheduled pending ready job starts execution by a worker. The job is dequeued. Callback `job_func` is called. -// Status of an executing job is PENDING. And it is still considered as a scheduled job by AsyncLoader. -// Note that `job_func` of a CANCELED job is never executed. +// 4) The ready job starts execution by a worker. The job is dequeued. Callback `job_func` is called. +// Status of an executing job is PENDING. Note that `job_func` of a CANCELED job is never executed. // 5a) On successful execution the job status is changed to OK and all existing and new `wait()` calls finish w/o exceptions. // 5b) Any exception thrown out of `job_func` is wrapped into an ASYNC_LOAD_FAILED exception and saved inside LoadJob. // The job status is changed to FAILED. All the dependent jobs are canceled. The exception is rethrown from all existing and new `wait()` calls. // 6) The job is no longer considered as scheduled and is instead moved to the finished jobs set. This is just for introspection of the finished jobs. // 7) The task containing this job is destructed or `remove()` is explicitly called. The job is removed from the finished job set. // 8) The job is destructed. -// -// Every job has a priority associated with it. AsyncLoader runs higher priority (greater `priority` value) jobs first. Job priority can be elevated -// (a) if either it has a dependent job with higher priority (in this case priority of a dependent job is inherited); -// (b) or job was explicitly prioritized by `prioritize(job, higher_priority)` call (this also leads to a priority inheritance for all the dependencies). -// Note that to avoid priority inversion `job_func` should use `self->priority()` to schedule new jobs in AsyncLoader or any other pool. -// Value stored in load job priority field is atomic and can be increased even during job execution. -// -// When a task is scheduled it can contain dependencies on previously scheduled jobs. These jobs can have any status. If job A being scheduled depends on -// another job B that is not yet scheduled, then job B will also be scheduled (even if the task does not contain it). class AsyncLoader : private boost::noncopyable { private: - // Key of a pending job in the ready queue. - struct ReadyKey + // Thread pool for job execution. + // Pools control the following aspects of job execution: + // 1) Concurrency: Amount of concurrently executing jobs in a pool is `max_threads`. + // 2) Priority: As long as there is executing worker with higher priority, workers with lower priorities are not started + // (although, they can finish last job started before higher priority jobs appeared) + struct Pool { - ssize_t priority; // Ascending order - ssize_t initial_priority; // Ascending order - UInt64 ready_seqno; // Descending order + const String name; + const ssize_t priority; + std::unique_ptr thread_pool; // NOTE: we avoid using a `ThreadPool` queue to be able to move jobs between pools. + std::map ready_queue; // FIFO queue of jobs to be executed in this pool. Map is used for faster erasing. Key is `ready_seqno` + size_t max_threads; // Max number of workers to be spawn + size_t workers = 0; // Number of currently execution workers - bool operator<(const ReadyKey & rhs) const - { - if (priority > rhs.priority) - return true; - if (priority < rhs.priority) - return false; - if (initial_priority > rhs.initial_priority) - return true; - if (initial_priority < rhs.initial_priority) - return false; - return ready_seqno < rhs.ready_seqno; - } + bool isActive() const { return workers > 0 || !ready_queue.empty(); } }; // Scheduling information for a pending job. struct Info { - ssize_t initial_priority = 0; // Initial priority passed into schedule(). - ssize_t priority = 0; // Elevated priority, due to priority inheritance or prioritize(). size_t dependencies_left = 0; // Current number of dependencies on pending jobs. UInt64 ready_seqno = 0; // Zero means that job is not in ready queue. LoadJobSet dependent_jobs; // Set of jobs dependent on this job. - // Three independent states of a non-finished job. - bool is_blocked() const { return dependencies_left > 0; } - bool is_ready() const { return dependencies_left == 0 && ready_seqno > 0; } - bool is_executing() const { return dependencies_left == 0 && ready_seqno == 0; } - - // Get key of a ready job - ReadyKey key() const - { - return {.priority = priority, .initial_priority = initial_priority, .ready_seqno = ready_seqno}; - } + // Three independent states of a scheduled job. + bool isBlocked() const { return dependencies_left > 0; } + bool isReady() const { return dependencies_left == 0 && ready_seqno > 0; } + bool isExecuting() const { return dependencies_left == 0 && ready_seqno == 0; } }; public: using Metric = CurrentMetrics::Metric; - AsyncLoader(Metric metric_threads, Metric metric_active_threads, size_t max_threads_, bool log_failures_, bool log_progress_); + // Helper struct for AsyncLoader construction + struct PoolInitializer + { + String name; + Metric metric_threads; + Metric metric_active_threads; + size_t max_threads; + ssize_t priority; + }; + AsyncLoader(std::vector pool_initializers, bool log_failures_, bool log_progress_); + + // Stops AsyncLoader before destruction // WARNING: all tasks instances should be destructed before associated AsyncLoader. ~AsyncLoader(); - // Start workers to execute scheduled load jobs. + // Start workers to execute scheduled load jobs. Note that AsyncLoader is constructed as already started. void start(); // Wait for all load jobs to finish, including all new jobs. So at first take care to stop adding new jobs. @@ -356,28 +388,32 @@ public: // - or canceled using ~Task() or remove() later. void stop(); - // Schedule all jobs of given `task` and their dependencies (if any, not scheduled yet). - // Higher priority jobs (with greater `job->priority()` value) are executed earlier. - // All dependencies of a scheduled job inherit its priority if it is higher. This way higher priority job - // never wait for (blocked by) lower priority jobs. No priority inversion is possible. + // Schedule all jobs of given `task` and their dependencies (even if they are not in task). + // All dependencies of a scheduled job inherit its pool if it has higher priority. This way higher priority job + // never waits for (blocked by) lower priority jobs. No priority inversion is possible. + // Idempotent: multiple schedule() calls for the same job are no-op. // Note that `task` destructor ensures that all its jobs are finished (OK, FAILED or CANCELED) // and are removed from AsyncLoader, so it is thread-safe to destroy them. void schedule(LoadTask & task); void schedule(const LoadTaskPtr & task); // Schedule all tasks atomically. To ensure only highest priority jobs among all tasks are run first. - void schedule(const std::vector & tasks); + void schedule(const LoadTaskPtrs & tasks); // Increase priority of a job and all its dependencies recursively. - void prioritize(const LoadJobPtr & job, ssize_t new_priority); + // Jobs from higher (than `new_pool`) priority pools are not changed. + void prioritize(const LoadJobPtr & job, size_t new_pool); // Remove finished jobs, cancel scheduled jobs, wait for executing jobs to finish and remove them. void remove(const LoadJobSet & jobs); - // Increase or decrease maximum number of simultaneously executing jobs. - void setMaxThreads(size_t value); + // Increase or decrease maximum number of simultaneously executing jobs in `pool`. + void setMaxThreads(size_t pool, size_t value); + + size_t getMaxThreads(size_t pool) const; + const String & getPoolName(size_t pool) const; + ssize_t getPoolPriority(size_t pool) const; - size_t getMaxThreads() const; size_t getScheduledJobCount() const; // Helper class for introspection @@ -385,11 +421,10 @@ public: { LoadJobPtr job; size_t dependencies_left = 0; - bool is_executing = false; + UInt64 ready_seqno = 0; bool is_blocked = false; bool is_ready = false; - std::optional initial_priority; - std::optional ready_seqno; + bool is_executing = false; }; // For introspection and debug only, see `system.async_loader` table @@ -398,42 +433,32 @@ public: private: void checkCycle(const LoadJobSet & jobs, std::unique_lock & lock); String checkCycleImpl(const LoadJobPtr & job, LoadJobSet & left, LoadJobSet & visited, std::unique_lock & lock); - void finish(std::unique_lock & lock, const LoadJobPtr & job, LoadStatus status, std::exception_ptr exception_from_job = {}); + void finish(const LoadJobPtr & job, LoadStatus status, std::exception_ptr exception_from_job, std::unique_lock & lock); void scheduleImpl(const LoadJobSet & input_jobs); void gatherNotScheduled(const LoadJobPtr & job, LoadJobSet & jobs, std::unique_lock & lock); - void prioritize(const LoadJobPtr & job, ssize_t new_priority, std::unique_lock & lock); + void prioritize(const LoadJobPtr & job, size_t new_pool_id, std::unique_lock & lock); void enqueue(Info & info, const LoadJobPtr & job, std::unique_lock & lock); - void spawn(std::unique_lock &); - void worker(); + bool canSpawnWorker(Pool & pool, std::unique_lock &); + bool canWorkerLive(Pool & pool, std::unique_lock &); + void updateCurrentPriorityAndSpawn(std::unique_lock &); + void spawn(Pool & pool, std::unique_lock &); + void worker(Pool & pool); // Logging const bool log_failures; // Worker should log all exceptions caught from job functions. const bool log_progress; // Periodically log total progress Poco::Logger * log; - std::chrono::system_clock::time_point busy_period_start_time; - AtomicStopwatch stopwatch; - size_t old_jobs = 0; // Number of jobs that were finished in previous busy period (for correct progress indication) mutable std::mutex mutex; // Guards all the fields below. - bool is_running = false; - - // Full set of scheduled pending jobs along with scheduling info. - std::unordered_map scheduled_jobs; - - // Subset of scheduled pending non-blocked jobs (waiting for a worker to be executed). - // Represent a queue of jobs in order of decreasing priority and FIFO for jobs with equal priorities. - std::map ready_queue; - - // Set of finished jobs (for introspection only, until jobs are removed). - LoadJobSet finished_jobs; - - // Increasing counter for `ReadyKey` assignment (to preserve FIFO order of the jobs with equal priorities). - UInt64 last_ready_seqno = 0; - - // For executing jobs. Note that we avoid using an internal queue of the pool to be able to prioritize jobs. - size_t max_threads; - size_t workers = 0; - ThreadPool pool; + bool is_running = true; + std::optional current_priority; // highest priority among active pools + UInt64 last_ready_seqno = 0; // Increasing counter for ready queue keys. + std::unordered_map scheduled_jobs; // Full set of scheduled pending jobs along with scheduling info. + std::vector pools; // Thread pools for job execution and ready queues + LoadJobSet finished_jobs; // Set of finished jobs (for introspection only, until jobs are removed). + AtomicStopwatch stopwatch; // For progress indication + size_t old_jobs = 0; // Number of jobs that were finished in previous busy period (for correct progress indication) + std::chrono::system_clock::time_point busy_period_start_time; }; } diff --git a/src/Common/tests/gtest_async_loader.cpp b/src/Common/tests/gtest_async_loader.cpp index 5666c4b923e..af685b10f4e 100644 --- a/src/Common/tests/gtest_async_loader.cpp +++ b/src/Common/tests/gtest_async_loader.cpp @@ -30,6 +30,11 @@ namespace DB::ErrorCodes extern const int ASYNC_LOAD_CANCELED; } +struct Initializer { + size_t max_threads = 1; + ssize_t priority = 0; +}; + struct AsyncLoaderTest { AsyncLoader loader; @@ -37,10 +42,34 @@ struct AsyncLoaderTest std::mutex rng_mutex; pcg64 rng{randomSeed()}; + explicit AsyncLoaderTest(std::vector initializers) + : loader(getPoolInitializers(initializers), /* log_failures = */ false, /* log_progress = */ false) + { + loader.stop(); // All tests call `start()` manually to better control ordering + } + explicit AsyncLoaderTest(size_t max_threads = 1) - : loader(CurrentMetrics::TablesLoaderThreads, CurrentMetrics::TablesLoaderThreadsActive, max_threads, /* log_failures = */ false, /* log_progress = */ false) + : AsyncLoaderTest({{.max_threads = max_threads}}) {} + std::vector getPoolInitializers(std::vector initializers) + { + std::vector result; + size_t pool_id = 0; + for (auto & desc : initializers) + { + result.push_back({ + .name = fmt::format("Pool{}", pool_id), + .metric_threads = CurrentMetrics::TablesLoaderThreads, + .metric_active_threads = CurrentMetrics::TablesLoaderThreadsActive, + .max_threads = desc.max_threads, + .priority = desc.priority + }); + pool_id++; + } + return result; + } + template T randomInt(T from, T to) { @@ -114,16 +143,19 @@ struct AsyncLoaderTest TEST(AsyncLoader, Smoke) { - AsyncLoaderTest t(2); + AsyncLoaderTest t({ + {.max_threads = 2, .priority = 0}, + {.max_threads = 2, .priority = -1}, + }); - static constexpr ssize_t low_priority = -1; + static constexpr ssize_t low_priority_pool = 1; std::atomic jobs_done{0}; std::atomic low_priority_jobs_done{0}; auto job_func = [&] (const LoadJobPtr & self) { jobs_done++; - if (self->priority() == low_priority) + if (self->pool() == low_priority_pool) low_priority_jobs_done++; }; @@ -135,7 +167,7 @@ TEST(AsyncLoader, Smoke) auto job3 = makeLoadJob({ job2 }, "job3", job_func); auto job4 = makeLoadJob({ job2 }, "job4", job_func); auto task2 = t.schedule({ job3, job4 }); - auto job5 = makeLoadJob({ job3, job4 }, low_priority, "job5", job_func); + auto job5 = makeLoadJob({ job3, job4 }, low_priority_pool, "job5", job_func); task2->merge(t.schedule({ job5 })); std::thread waiter_thread([=] { job5->wait(); }); @@ -536,7 +568,7 @@ TEST(AsyncLoader, TestOverload) AsyncLoaderTest t(3); t.loader.start(); - size_t max_threads = t.loader.getMaxThreads(); + size_t max_threads = t.loader.getMaxThreads(/* pool = */ 0); std::atomic executing{0}; for (int concurrency = 4; concurrency <= 8; concurrency++) @@ -562,13 +594,24 @@ TEST(AsyncLoader, TestOverload) TEST(AsyncLoader, StaticPriorities) { - AsyncLoaderTest t(1); + AsyncLoaderTest t({ + {.max_threads = 1, .priority = 0}, + {.max_threads = 1, .priority = 1}, + {.max_threads = 1, .priority = 2}, + {.max_threads = 1, .priority = 3}, + {.max_threads = 1, .priority = 4}, + {.max_threads = 1, .priority = 5}, + {.max_threads = 1, .priority = 6}, + {.max_threads = 1, .priority = 7}, + {.max_threads = 1, .priority = 8}, + {.max_threads = 1, .priority = 9}, + }); std::string schedule; auto job_func = [&] (const LoadJobPtr & self) { - schedule += fmt::format("{}{}", self->name, self->priority()); + schedule += fmt::format("{}{}", self->name, self->pool()); }; std::vector jobs; @@ -588,21 +631,110 @@ TEST(AsyncLoader, StaticPriorities) ASSERT_EQ(schedule, "A9E9D9F9G9H9C4B3"); } +TEST(AsyncLoader, SimplePrioritization) +{ + AsyncLoaderTest t({ + {.max_threads = 1, .priority = 0}, + {.max_threads = 1, .priority = 1}, + {.max_threads = 1, .priority = 2}, + }); + + t.loader.start(); + + std::atomic executed{0}; // Number of previously executed jobs (to test execution order) + LoadJobPtr job_to_prioritize; + + auto job_func_A_booster = [&] (const LoadJobPtr &) + { + ASSERT_EQ(executed++, 0); + t.loader.prioritize(job_to_prioritize, 2); + }; + + auto job_func_B_tester = [&] (const LoadJobPtr &) + { + ASSERT_EQ(executed++, 2); + }; + + auto job_func_C_boosted = [&] (const LoadJobPtr &) + { + ASSERT_EQ(executed++, 1); + }; + + std::vector jobs; + jobs.push_back(makeLoadJob({}, 1, "A", job_func_A_booster)); // 0 + jobs.push_back(makeLoadJob({jobs[0]}, 1, "B", job_func_B_tester)); // 1 + jobs.push_back(makeLoadJob({}, 0, "C", job_func_C_boosted)); // 2 + auto task = makeLoadTask(t.loader, { jobs.begin(), jobs.end() }); + + job_to_prioritize = jobs[2]; // C + + scheduleAndWaitLoadAll(task); +} + TEST(AsyncLoader, DynamicPriorities) { - AsyncLoaderTest t(1); + AsyncLoaderTest t({ + {.max_threads = 1, .priority = 0}, + {.max_threads = 1, .priority = 1}, + {.max_threads = 1, .priority = 2}, + {.max_threads = 1, .priority = 3}, + {.max_threads = 1, .priority = 4}, + {.max_threads = 1, .priority = 5}, + {.max_threads = 1, .priority = 6}, + {.max_threads = 1, .priority = 7}, + {.max_threads = 1, .priority = 8}, + {.max_threads = 1, .priority = 9}, + }); for (bool prioritize : {false, true}) { + // Although all pools have max_threads=1, workers from different pools can run simultaneously just after `prioritize()` call + std::barrier sync(2); + bool wait_sync = prioritize; + std::mutex schedule_mutex; std::string schedule; LoadJobPtr job_to_prioritize; + // Order of execution of jobs D and E after prioritization is undefined, because it depend on `ready_seqno` + // (Which depends on initial `schedule()` order, which in turn depend on `std::unordered_map` order) + // So we have to obtain `ready_seqno` to be sure. + UInt64 ready_seqno_D = 0; + UInt64 ready_seqno_E = 0; + auto job_func = [&] (const LoadJobPtr & self) { + { + std::unique_lock lock{schedule_mutex}; + schedule += fmt::format("{}{}", self->name, self->executionPool()); + } + if (prioritize && self->name == "C") - t.loader.prioritize(job_to_prioritize, 9); // dynamic prioritization - schedule += fmt::format("{}{}", self->name, self->priority()); + { + for (const auto & state : t.loader.getJobStates()) + { + if (state.job->name == "D") + ready_seqno_D = state.ready_seqno; + if (state.job->name == "E") + ready_seqno_E = state.ready_seqno; + } + + // Jobs D and E should be enqueued at the moment + ASSERT_LT(0, ready_seqno_D); + ASSERT_LT(0, ready_seqno_E); + + // Dynamic prioritization G0 -> G9 + // Note that it will spawn concurrent worker in higher priority pool + t.loader.prioritize(job_to_prioritize, 9); + + sync.arrive_and_wait(); // (A) wait for higher priority worker (B) to test they can be concurrent + } + + if (wait_sync && (self->name == "D" || self->name == "E")) + { + wait_sync = false; + sync.arrive_and_wait(); // (B) + } }; // Job DAG with initial priorities. During execution of C4, job G0 priority is increased to G9, postponing B3 job executing. @@ -624,14 +756,19 @@ TEST(AsyncLoader, DynamicPriorities) jobs.push_back(makeLoadJob({ jobs[6] }, 0, "H", job_func)); // 7 auto task = t.schedule({ jobs.begin(), jobs.end() }); - job_to_prioritize = jobs[6]; + job_to_prioritize = jobs[6]; // G t.loader.start(); t.loader.wait(); t.loader.stop(); if (prioritize) - ASSERT_EQ(schedule, "A4C4E9D9F9G9B3H0"); + { + if (ready_seqno_D < ready_seqno_E) + ASSERT_EQ(schedule, "A4C4D9E9F9G9B3H0"); + else + ASSERT_EQ(schedule, "A4C4E9D9F9G9B3H0"); + } else ASSERT_EQ(schedule, "A4C4B3E2D1F0G0H0"); } @@ -742,8 +879,64 @@ TEST(AsyncLoader, SetMaxThreads) syncs[idx]->arrive_and_wait(); // (A) sync_index++; if (sync_index < syncs.size()) - t.loader.setMaxThreads(max_threads_values[sync_index]); + t.loader.setMaxThreads(/* pool = */ 0, max_threads_values[sync_index]); syncs[idx]->arrive_and_wait(); // (B) this sync point is required to allow `executing` value to go back down to zero after we change number of workers } t.loader.wait(); } + +TEST(AsyncLoader, DynamicPools) +{ + const size_t max_threads[] { 2, 10 }; + const int jobs_in_chain = 16; + AsyncLoaderTest t({ + {.max_threads = max_threads[0], .priority = 0}, + {.max_threads = max_threads[1], .priority = 1}, + }); + + t.loader.start(); + + std::atomic executing[2] { 0, 0 }; // Number of currently executing jobs per pool + + for (int concurrency = 1; concurrency <= 12; concurrency++) + { + std::atomic boosted{false}; // Visible concurrency was increased + std::atomic left{concurrency * jobs_in_chain / 2}; // Number of jobs to start before `prioritize()` call + + LoadJobSet jobs_to_prioritize; + + auto job_func = [&] (const LoadJobPtr & self) + { + auto pool_id = self->executionPool(); + executing[pool_id]++; + if (executing[pool_id] > max_threads[0]) + boosted = true; + ASSERT_LE(executing[pool_id], max_threads[pool_id]); + + // Dynamic prioritization + if (--left == 0) + { + for (const auto & job : jobs_to_prioritize) + t.loader.prioritize(job, 1); + } + + t.randomSleepUs(100, 200, 100); + + ASSERT_LE(executing[pool_id], max_threads[pool_id]); + executing[pool_id]--; + }; + + std::vector tasks; + tasks.reserve(concurrency); + for (int i = 0; i < concurrency; i++) + tasks.push_back(makeLoadTask(t.loader, t.chainJobSet(jobs_in_chain, job_func))); + jobs_to_prioritize = getGoals(tasks); // All jobs + scheduleAndWaitLoadAll(tasks); + + ASSERT_EQ(executing[0], 0); + ASSERT_EQ(executing[1], 0); + ASSERT_EQ(boosted, concurrency > 2); + boosted = false; + } + +}