mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Merge pull request #49893 from ClickHouse/async-loader-workloads
Multiple pools support for AsyncLoader
This commit is contained in:
commit
6554e7438e
@ -6,6 +6,7 @@
|
||||
#include <Common/noexcept_scope.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -41,9 +42,14 @@ std::exception_ptr LoadJob::exception() const
|
||||
return load_exception;
|
||||
}
|
||||
|
||||
ssize_t LoadJob::priority() const
|
||||
size_t LoadJob::executionPool() const
|
||||
{
|
||||
return load_priority;
|
||||
return execution_pool_id;
|
||||
}
|
||||
|
||||
size_t LoadJob::pool() const
|
||||
{
|
||||
return pool_id;
|
||||
}
|
||||
|
||||
void LoadJob::wait() const
|
||||
@ -112,8 +118,9 @@ void LoadJob::enqueued()
|
||||
enqueue_time = std::chrono::system_clock::now();
|
||||
}
|
||||
|
||||
void LoadJob::execute(const LoadJobPtr & self)
|
||||
void LoadJob::execute(size_t pool, const LoadJobPtr & self)
|
||||
{
|
||||
execution_pool_id = pool;
|
||||
start_time = std::chrono::system_clock::now();
|
||||
func(self);
|
||||
}
|
||||
@ -148,22 +155,35 @@ void LoadTask::remove()
|
||||
{
|
||||
loader.remove(jobs);
|
||||
jobs.clear();
|
||||
goal_jobs.clear();
|
||||
}
|
||||
}
|
||||
|
||||
void LoadTask::detach()
|
||||
{
|
||||
jobs.clear();
|
||||
goal_jobs.clear();
|
||||
}
|
||||
|
||||
AsyncLoader::AsyncLoader(Metric metric_threads, Metric metric_active_threads, size_t max_threads_, bool log_failures_, bool log_progress_)
|
||||
|
||||
AsyncLoader::AsyncLoader(std::vector<PoolInitializer> pool_initializers, bool log_failures_, bool log_progress_)
|
||||
: log_failures(log_failures_)
|
||||
, log_progress(log_progress_)
|
||||
, log(&Poco::Logger::get("AsyncLoader"))
|
||||
, max_threads(max_threads_)
|
||||
, pool(metric_threads, metric_active_threads, max_threads)
|
||||
{
|
||||
|
||||
pools.reserve(pool_initializers.size());
|
||||
for (auto && init : pool_initializers)
|
||||
pools.push_back({
|
||||
.name = init.name,
|
||||
.priority = init.priority,
|
||||
.thread_pool = std::make_unique<ThreadPool>(
|
||||
init.metric_threads,
|
||||
init.metric_active_threads,
|
||||
init.max_threads,
|
||||
/* max_free_threads = */ 0,
|
||||
init.max_threads),
|
||||
.max_threads = init.max_threads
|
||||
});
|
||||
}
|
||||
|
||||
AsyncLoader::~AsyncLoader()
|
||||
@ -175,13 +195,20 @@ void AsyncLoader::start()
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
is_running = true;
|
||||
for (size_t i = 0; workers < max_threads && i < ready_queue.size(); i++)
|
||||
spawn(lock);
|
||||
updateCurrentPriorityAndSpawn(lock);
|
||||
}
|
||||
|
||||
void AsyncLoader::wait()
|
||||
{
|
||||
pool.wait();
|
||||
// Because job can create new jobs in other pools we have to recheck in cycle
|
||||
std::unique_lock lock{mutex};
|
||||
while (!scheduled_jobs.empty())
|
||||
{
|
||||
lock.unlock();
|
||||
for (auto & p : pools)
|
||||
p.thread_pool->wait();
|
||||
lock.lock();
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncLoader::stop()
|
||||
@ -191,7 +218,7 @@ void AsyncLoader::stop()
|
||||
is_running = false;
|
||||
// NOTE: there is no need to notify because workers never wait
|
||||
}
|
||||
pool.wait();
|
||||
wait();
|
||||
}
|
||||
|
||||
void AsyncLoader::schedule(LoadTask & task)
|
||||
@ -229,9 +256,9 @@ void AsyncLoader::scheduleImpl(const LoadJobSet & input_jobs)
|
||||
old_jobs = finished_jobs.size();
|
||||
}
|
||||
|
||||
// Make set of jobs to schedule:
|
||||
// Pass 1. Make set of jobs to schedule:
|
||||
// 1) exclude already scheduled or finished jobs
|
||||
// 2) include pending dependencies, that are not yet scheduled
|
||||
// 2) include assigned job dependencies (that are not yet scheduled)
|
||||
LoadJobSet jobs;
|
||||
for (const auto & job : input_jobs)
|
||||
gatherNotScheduled(job, jobs, lock);
|
||||
@ -242,17 +269,18 @@ void AsyncLoader::scheduleImpl(const LoadJobSet & input_jobs)
|
||||
// We do not want any exception to be throws after this point, because the following code is not exception-safe
|
||||
DENY_ALLOCATIONS_IN_SCOPE;
|
||||
|
||||
// Schedule all incoming jobs
|
||||
// Pass 2. Schedule all incoming jobs
|
||||
for (const auto & job : jobs)
|
||||
{
|
||||
chassert(job->pool() < pools.size());
|
||||
NOEXCEPT_SCOPE({
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
scheduled_jobs.emplace(job, Info{.initial_priority = job->load_priority, .priority = job->load_priority});
|
||||
scheduled_jobs.try_emplace(job);
|
||||
job->scheduled();
|
||||
});
|
||||
}
|
||||
|
||||
// Process dependencies on scheduled pending jobs
|
||||
// Pass 3. Process dependencies on scheduled jobs, priority inheritance
|
||||
for (const auto & job : jobs)
|
||||
{
|
||||
Info & info = scheduled_jobs.find(job)->second;
|
||||
@ -267,17 +295,18 @@ void AsyncLoader::scheduleImpl(const LoadJobSet & input_jobs)
|
||||
});
|
||||
info.dependencies_left++;
|
||||
|
||||
// Priority inheritance: prioritize deps to have at least given `priority` to avoid priority inversion
|
||||
prioritize(dep, info.priority, lock);
|
||||
// Priority inheritance: prioritize deps to have at least given `pool.priority` to avoid priority inversion
|
||||
prioritize(dep, job->pool_id, lock);
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue non-blocked jobs (w/o dependencies) to ready queue
|
||||
if (!info.is_blocked())
|
||||
if (!info.isBlocked())
|
||||
enqueue(info, job, lock);
|
||||
}
|
||||
|
||||
// Process dependencies on other jobs. It is done in a separate pass to facilitate propagation of cancel signals (if any).
|
||||
// Pass 4: Process dependencies on other jobs.
|
||||
// It is done in a separate pass to facilitate cancelling due to already failed dependencies.
|
||||
for (const auto & job : jobs)
|
||||
{
|
||||
if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end())
|
||||
@ -285,12 +314,12 @@ void AsyncLoader::scheduleImpl(const LoadJobSet & input_jobs)
|
||||
for (const auto & dep : job->dependencies)
|
||||
{
|
||||
if (scheduled_jobs.contains(dep))
|
||||
continue; // Skip dependencies on scheduled pending jobs (already processed)
|
||||
continue; // Skip dependencies on scheduled jobs (already processed in pass 3)
|
||||
LoadStatus dep_status = dep->status();
|
||||
if (dep_status == LoadStatus::OK)
|
||||
continue; // Dependency on already successfully finished job -- it's okay.
|
||||
|
||||
// Dependency on not scheduled pending job -- it's bad.
|
||||
// Dependency on assigned job -- it's bad.
|
||||
// Probably, there is an error in `jobs` set, `gatherNotScheduled()` should have fixed it.
|
||||
chassert(dep_status != LoadStatus::PENDING);
|
||||
|
||||
@ -305,7 +334,7 @@ void AsyncLoader::scheduleImpl(const LoadJobSet & input_jobs)
|
||||
job->name,
|
||||
getExceptionMessage(dep->exception(), /* with_stacktrace = */ false)));
|
||||
});
|
||||
finish(lock, job, LoadStatus::CANCELED, e);
|
||||
finish(job, LoadStatus::CANCELED, e, lock);
|
||||
break; // This job is now finished, stop its dependencies processing
|
||||
}
|
||||
}
|
||||
@ -327,13 +356,14 @@ void AsyncLoader::gatherNotScheduled(const LoadJobPtr & job, LoadJobSet & jobs,
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncLoader::prioritize(const LoadJobPtr & job, ssize_t new_priority)
|
||||
void AsyncLoader::prioritize(const LoadJobPtr & job, size_t new_pool)
|
||||
{
|
||||
if (!job)
|
||||
return;
|
||||
chassert(new_pool < pools.size());
|
||||
DENY_ALLOCATIONS_IN_SCOPE;
|
||||
std::unique_lock lock{mutex};
|
||||
prioritize(job, new_priority, lock);
|
||||
prioritize(job, new_pool, lock);
|
||||
}
|
||||
|
||||
void AsyncLoader::remove(const LoadJobSet & jobs)
|
||||
@ -347,14 +377,14 @@ void AsyncLoader::remove(const LoadJobSet & jobs)
|
||||
{
|
||||
if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end())
|
||||
{
|
||||
if (info->second.is_executing())
|
||||
if (info->second.isExecuting())
|
||||
continue; // Skip executing jobs on the first pass
|
||||
std::exception_ptr e;
|
||||
NOEXCEPT_SCOPE({
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
e = std::make_exception_ptr(Exception(ErrorCodes::ASYNC_LOAD_CANCELED, "Load job '{}' canceled", job->name));
|
||||
});
|
||||
finish(lock, job, LoadStatus::CANCELED, e);
|
||||
finish(job, LoadStatus::CANCELED, e, lock);
|
||||
}
|
||||
}
|
||||
// On the second pass wait for executing jobs to finish
|
||||
@ -363,7 +393,7 @@ void AsyncLoader::remove(const LoadJobSet & jobs)
|
||||
if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end())
|
||||
{
|
||||
// Job is currently executing
|
||||
chassert(info->second.is_executing());
|
||||
chassert(info->second.isExecuting());
|
||||
lock.unlock();
|
||||
job->waitNoThrow(); // Wait for job to finish
|
||||
lock.lock();
|
||||
@ -379,25 +409,36 @@ void AsyncLoader::remove(const LoadJobSet & jobs)
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncLoader::setMaxThreads(size_t value)
|
||||
void AsyncLoader::setMaxThreads(size_t pool, size_t value)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
pool.setMaxThreads(value);
|
||||
pool.setMaxFreeThreads(value);
|
||||
pool.setQueueSize(value);
|
||||
max_threads = value;
|
||||
auto & p = pools[pool];
|
||||
p.thread_pool->setMaxThreads(value);
|
||||
p.thread_pool->setQueueSize(value); // Keep queue size equal max threads count to avoid blocking during spawning
|
||||
p.max_threads = value;
|
||||
if (!is_running)
|
||||
return;
|
||||
for (size_t i = 0; workers < max_threads && i < ready_queue.size(); i++)
|
||||
spawn(lock);
|
||||
for (size_t i = 0; canSpawnWorker(p, lock) && i < p.ready_queue.size(); i++)
|
||||
spawn(p, lock);
|
||||
}
|
||||
|
||||
size_t AsyncLoader::getMaxThreads() const
|
||||
size_t AsyncLoader::getMaxThreads(size_t pool) const
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
return max_threads;
|
||||
return pools[pool].max_threads;
|
||||
}
|
||||
|
||||
const String & AsyncLoader::getPoolName(size_t pool) const
|
||||
{
|
||||
return pools[pool].name; // NOTE: lock is not needed because `name` is const and `pools` are immutable
|
||||
}
|
||||
|
||||
ssize_t AsyncLoader::getPoolPriority(size_t pool) const
|
||||
{
|
||||
return pools[pool].priority; // NOTE: lock is not needed because `priority` is const and `pools` are immutable
|
||||
}
|
||||
|
||||
|
||||
size_t AsyncLoader::getScheduledJobCount() const
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
@ -412,11 +453,10 @@ std::vector<AsyncLoader::JobState> AsyncLoader::getJobStates() const
|
||||
states.emplace(job->name, JobState{
|
||||
.job = job,
|
||||
.dependencies_left = info.dependencies_left,
|
||||
.is_executing = info.is_executing(),
|
||||
.is_blocked = info.is_blocked(),
|
||||
.is_ready = info.is_ready(),
|
||||
.initial_priority = info.initial_priority,
|
||||
.ready_seqno = last_ready_seqno
|
||||
.ready_seqno = info.ready_seqno,
|
||||
.is_blocked = info.isBlocked(),
|
||||
.is_ready = info.isReady(),
|
||||
.is_executing = info.isExecuting()
|
||||
});
|
||||
for (const auto & job : finished_jobs)
|
||||
states.emplace(job->name, JobState{.job = job});
|
||||
@ -462,21 +502,21 @@ String AsyncLoader::checkCycleImpl(const LoadJobPtr & job, LoadJobSet & left, Lo
|
||||
return {};
|
||||
}
|
||||
|
||||
void AsyncLoader::finish(std::unique_lock<std::mutex> & lock, const LoadJobPtr & job, LoadStatus status, std::exception_ptr exception_from_job)
|
||||
void AsyncLoader::finish(const LoadJobPtr & job, LoadStatus status, std::exception_ptr exception_from_job, std::unique_lock<std::mutex> & lock)
|
||||
{
|
||||
chassert(scheduled_jobs.contains(job)); // Job was pending
|
||||
if (status == LoadStatus::OK)
|
||||
{
|
||||
// Notify waiters
|
||||
job->ok();
|
||||
|
||||
// Update dependent jobs and enqueue if ready
|
||||
chassert(scheduled_jobs.contains(job)); // Job was pending
|
||||
for (const auto & dep : scheduled_jobs[job].dependent_jobs)
|
||||
{
|
||||
chassert(scheduled_jobs.contains(dep)); // All depended jobs must be pending
|
||||
Info & dep_info = scheduled_jobs[dep];
|
||||
dep_info.dependencies_left--;
|
||||
if (!dep_info.is_blocked())
|
||||
if (!dep_info.isBlocked())
|
||||
enqueue(dep_info, dep, lock);
|
||||
}
|
||||
}
|
||||
@ -488,11 +528,10 @@ void AsyncLoader::finish(std::unique_lock<std::mutex> & lock, const LoadJobPtr &
|
||||
else if (status == LoadStatus::CANCELED)
|
||||
job->canceled(exception_from_job);
|
||||
|
||||
chassert(scheduled_jobs.contains(job)); // Job was pending
|
||||
Info & info = scheduled_jobs[job];
|
||||
if (info.is_ready())
|
||||
if (info.isReady())
|
||||
{
|
||||
ready_queue.erase(info.key());
|
||||
pools[job->pool_id].ready_queue.erase(info.ready_seqno);
|
||||
info.ready_seqno = 0;
|
||||
}
|
||||
|
||||
@ -512,7 +551,7 @@ void AsyncLoader::finish(std::unique_lock<std::mutex> & lock, const LoadJobPtr &
|
||||
dep->name,
|
||||
getExceptionMessage(exception_from_job, /* with_stacktrace = */ false)));
|
||||
});
|
||||
finish(lock, dep, LoadStatus::CANCELED, e);
|
||||
finish(dep, LoadStatus::CANCELED, e, lock);
|
||||
}
|
||||
|
||||
// Clean dependency graph edges pointing to canceled jobs
|
||||
@ -531,87 +570,130 @@ void AsyncLoader::finish(std::unique_lock<std::mutex> & lock, const LoadJobPtr &
|
||||
});
|
||||
}
|
||||
|
||||
void AsyncLoader::prioritize(const LoadJobPtr & job, ssize_t new_priority, std::unique_lock<std::mutex> & lock)
|
||||
void AsyncLoader::prioritize(const LoadJobPtr & job, size_t new_pool_id, std::unique_lock<std::mutex> & lock)
|
||||
{
|
||||
if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end())
|
||||
{
|
||||
if (info->second.priority >= new_priority)
|
||||
return; // Never lower priority
|
||||
Pool & old_pool = pools[job->pool_id];
|
||||
Pool & new_pool = pools[new_pool_id];
|
||||
if (old_pool.priority >= new_pool.priority)
|
||||
return; // Never lower priority or change pool leaving the same priority
|
||||
|
||||
// Update priority and push job forward through ready queue if needed
|
||||
if (info->second.ready_seqno)
|
||||
ready_queue.erase(info->second.key());
|
||||
info->second.priority = new_priority;
|
||||
job->load_priority.store(new_priority); // Set user-facing priority (may affect executing jobs)
|
||||
if (info->second.ready_seqno)
|
||||
UInt64 ready_seqno = info->second.ready_seqno;
|
||||
|
||||
// Requeue job into the new pool queue without allocations
|
||||
if (ready_seqno)
|
||||
{
|
||||
NOEXCEPT_SCOPE({
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
ready_queue.emplace(info->second.key(), job);
|
||||
});
|
||||
new_pool.ready_queue.insert(old_pool.ready_queue.extract(ready_seqno));
|
||||
if (canSpawnWorker(new_pool, lock))
|
||||
spawn(new_pool, lock);
|
||||
}
|
||||
|
||||
// Set user-facing pool and priority (may affect executing jobs)
|
||||
job->pool_id.store(new_pool_id);
|
||||
|
||||
// Recurse into dependencies
|
||||
for (const auto & dep : job->dependencies)
|
||||
prioritize(dep, new_priority, lock);
|
||||
prioritize(dep, new_pool_id, lock);
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncLoader::enqueue(Info & info, const LoadJobPtr & job, std::unique_lock<std::mutex> & lock)
|
||||
{
|
||||
chassert(!info.is_blocked());
|
||||
chassert(!info.isBlocked());
|
||||
chassert(info.ready_seqno == 0);
|
||||
info.ready_seqno = ++last_ready_seqno;
|
||||
Pool & pool = pools[job->pool_id];
|
||||
NOEXCEPT_SCOPE({
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
ready_queue.emplace(info.key(), job);
|
||||
pool.ready_queue.emplace(info.ready_seqno, job);
|
||||
});
|
||||
|
||||
job->enqueued();
|
||||
|
||||
if (is_running && workers < max_threads)
|
||||
spawn(lock);
|
||||
if (canSpawnWorker(pool, lock))
|
||||
spawn(pool, lock);
|
||||
}
|
||||
|
||||
void AsyncLoader::spawn(std::unique_lock<std::mutex> &)
|
||||
bool AsyncLoader::canSpawnWorker(Pool & pool, std::unique_lock<std::mutex> &)
|
||||
{
|
||||
workers++;
|
||||
return is_running
|
||||
&& !pool.ready_queue.empty()
|
||||
&& pool.workers < pool.max_threads
|
||||
&& (!current_priority || *current_priority <= pool.priority);
|
||||
}
|
||||
|
||||
bool AsyncLoader::canWorkerLive(Pool & pool, std::unique_lock<std::mutex> &)
|
||||
{
|
||||
return is_running
|
||||
&& !pool.ready_queue.empty()
|
||||
&& pool.workers <= pool.max_threads
|
||||
&& (!current_priority || *current_priority <= pool.priority);
|
||||
}
|
||||
|
||||
void AsyncLoader::updateCurrentPriorityAndSpawn(std::unique_lock<std::mutex> & lock)
|
||||
{
|
||||
// Find current priority.
|
||||
// NOTE: We assume low number of pools, so O(N) scans are fine.
|
||||
std::optional<ssize_t> priority;
|
||||
for (Pool & pool : pools)
|
||||
{
|
||||
if (pool.isActive() && (!priority || *priority < pool.priority))
|
||||
priority = pool.priority;
|
||||
}
|
||||
current_priority = priority;
|
||||
|
||||
// Spawn workers in all pools with current priority
|
||||
for (Pool & pool : pools)
|
||||
{
|
||||
for (size_t i = 0; canSpawnWorker(pool, lock) && i < pool.ready_queue.size(); i++)
|
||||
spawn(pool, lock);
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncLoader::spawn(Pool & pool, std::unique_lock<std::mutex> &)
|
||||
{
|
||||
pool.workers++;
|
||||
current_priority = pool.priority; // canSpawnWorker() ensures this would not decrease current_priority
|
||||
NOEXCEPT_SCOPE({
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
pool.scheduleOrThrowOnError([this] { worker(); });
|
||||
pool.thread_pool->scheduleOrThrowOnError([this, &pool] { worker(pool); });
|
||||
});
|
||||
}
|
||||
|
||||
void AsyncLoader::worker()
|
||||
void AsyncLoader::worker(Pool & pool)
|
||||
{
|
||||
DENY_ALLOCATIONS_IN_SCOPE;
|
||||
|
||||
size_t pool_id = &pool - &*pools.begin();
|
||||
LoadJobPtr job;
|
||||
std::exception_ptr exception_from_job;
|
||||
while (true)
|
||||
{
|
||||
// This is inside the loop to also reset previous thread names set inside the jobs
|
||||
setThreadName("AsyncLoader");
|
||||
setThreadName(pool.name.c_str());
|
||||
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
|
||||
// Handle just executed job
|
||||
if (exception_from_job)
|
||||
finish(lock, job, LoadStatus::FAILED, exception_from_job);
|
||||
finish(job, LoadStatus::FAILED, exception_from_job, lock);
|
||||
else if (job)
|
||||
finish(lock, job, LoadStatus::OK);
|
||||
finish(job, LoadStatus::OK, {}, lock);
|
||||
|
||||
if (!is_running || ready_queue.empty() || workers > max_threads)
|
||||
if (!canWorkerLive(pool, lock))
|
||||
{
|
||||
workers--;
|
||||
if (--pool.workers == 0)
|
||||
updateCurrentPriorityAndSpawn(lock); // It will spawn lower priority workers if needed
|
||||
return;
|
||||
}
|
||||
|
||||
// Take next job to be executed from the ready queue
|
||||
auto it = ready_queue.begin();
|
||||
auto it = pool.ready_queue.begin();
|
||||
job = it->second;
|
||||
ready_queue.erase(it);
|
||||
pool.ready_queue.erase(it);
|
||||
scheduled_jobs.find(job)->second.ready_seqno = 0; // This job is no longer in the ready queue
|
||||
}
|
||||
|
||||
@ -619,7 +701,7 @@ void AsyncLoader::worker()
|
||||
|
||||
try
|
||||
{
|
||||
job->execute(job);
|
||||
job->execute(pool_id, job);
|
||||
exception_from_job = {};
|
||||
}
|
||||
catch (...)
|
||||
|
@ -12,7 +12,7 @@
|
||||
#include <base/types.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/ThreadPool_fwd.h>
|
||||
|
||||
|
||||
namespace Poco { class Logger; }
|
||||
@ -46,22 +46,28 @@ class LoadJob : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
template <class Func, class LoadJobSetType>
|
||||
LoadJob(LoadJobSetType && dependencies_, String name_, Func && func_, ssize_t priority_ = 0)
|
||||
LoadJob(LoadJobSetType && dependencies_, String name_, size_t pool_id_, Func && func_)
|
||||
: dependencies(std::forward<LoadJobSetType>(dependencies_))
|
||||
, name(std::move(name_))
|
||||
, pool_id(pool_id_)
|
||||
, func(std::forward<Func>(func_))
|
||||
, load_priority(priority_)
|
||||
{}
|
||||
|
||||
// Current job status.
|
||||
LoadStatus status() const;
|
||||
std::exception_ptr exception() const;
|
||||
|
||||
// Returns current value of a priority of the job. May differ from initial priority.
|
||||
ssize_t priority() const;
|
||||
// Returns pool in which the job is executing (was executed). May differ from initial pool and from current pool.
|
||||
// Value is only valid (and constant) after execution started.
|
||||
size_t executionPool() const;
|
||||
|
||||
// Returns current pool of the job. May differ from initial and execution pool.
|
||||
// This value is intended for creating new jobs during this job execution.
|
||||
// Value may change during job execution by `prioritize()`.
|
||||
size_t pool() const;
|
||||
|
||||
// Sync wait for a pending job to be finished: OK, FAILED or CANCELED status.
|
||||
// Throws if job is FAILED or CANCELED. Returns or throws immediately on non-pending job.
|
||||
// Throws if job is FAILED or CANCELED. Returns or throws immediately if called on non-pending job.
|
||||
void wait() const;
|
||||
|
||||
// Wait for a job to reach any non PENDING status.
|
||||
@ -90,10 +96,11 @@ private:
|
||||
|
||||
void scheduled();
|
||||
void enqueued();
|
||||
void execute(const LoadJobPtr & self);
|
||||
void execute(size_t pool, const LoadJobPtr & self);
|
||||
|
||||
std::atomic<size_t> execution_pool_id;
|
||||
std::atomic<size_t> pool_id;
|
||||
std::function<void(const LoadJobPtr & self)> func;
|
||||
std::atomic<ssize_t> load_priority;
|
||||
|
||||
mutable std::mutex mutex;
|
||||
mutable std::condition_variable finished;
|
||||
@ -115,25 +122,25 @@ struct EmptyJobFunc
|
||||
template <class Func = EmptyJobFunc>
|
||||
LoadJobPtr makeLoadJob(LoadJobSet && dependencies, String name, Func && func = EmptyJobFunc())
|
||||
{
|
||||
return std::make_shared<LoadJob>(std::move(dependencies), std::move(name), std::forward<Func>(func));
|
||||
return std::make_shared<LoadJob>(std::move(dependencies), std::move(name), 0, std::forward<Func>(func));
|
||||
}
|
||||
|
||||
template <class Func = EmptyJobFunc>
|
||||
LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, String name, Func && func = EmptyJobFunc())
|
||||
{
|
||||
return std::make_shared<LoadJob>(dependencies, std::move(name), std::forward<Func>(func));
|
||||
return std::make_shared<LoadJob>(dependencies, std::move(name), 0, std::forward<Func>(func));
|
||||
}
|
||||
|
||||
template <class Func = EmptyJobFunc>
|
||||
LoadJobPtr makeLoadJob(LoadJobSet && dependencies, ssize_t priority, String name, Func && func = EmptyJobFunc())
|
||||
LoadJobPtr makeLoadJob(LoadJobSet && dependencies, size_t pool_id, String name, Func && func = EmptyJobFunc())
|
||||
{
|
||||
return std::make_shared<LoadJob>(std::move(dependencies), std::move(name), std::forward<Func>(func), priority);
|
||||
return std::make_shared<LoadJob>(std::move(dependencies), std::move(name), pool_id, std::forward<Func>(func));
|
||||
}
|
||||
|
||||
template <class Func = EmptyJobFunc>
|
||||
LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, ssize_t priority, String name, Func && func = EmptyJobFunc())
|
||||
LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, size_t pool_id, String name, Func && func = EmptyJobFunc())
|
||||
{
|
||||
return std::make_shared<LoadJob>(dependencies, std::move(name), std::forward<Func>(func), priority);
|
||||
return std::make_shared<LoadJob>(dependencies, std::move(name), pool_id, std::forward<Func>(func));
|
||||
}
|
||||
|
||||
// Represents a logically connected set of LoadJobs required to achieve some goals (final LoadJob in the set).
|
||||
@ -185,7 +192,7 @@ inline void scheduleLoad(const LoadTaskPtrs & tasks)
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
inline void scheduleLoad(Args && ... args)
|
||||
inline void scheduleLoadAll(Args && ... args)
|
||||
{
|
||||
(scheduleLoad(std::forward<Args>(args)), ...);
|
||||
}
|
||||
@ -208,16 +215,16 @@ inline void waitLoad(const LoadTaskPtrs & tasks)
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
inline void waitLoad(Args && ... args)
|
||||
inline void waitLoadAll(Args && ... args)
|
||||
{
|
||||
(waitLoad(std::forward<Args>(args)), ...);
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
inline void scheduleAndWaitLoad(Args && ... args)
|
||||
inline void scheduleAndWaitLoadAll(Args && ... args)
|
||||
{
|
||||
scheduleLoad(std::forward<Args>(args)...);
|
||||
waitLoad(std::forward<Args>(args)...);
|
||||
scheduleLoadAll(std::forward<Args>(args)...);
|
||||
waitLoadAll(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
inline LoadJobSet getGoals(const LoadTaskPtrs & tasks)
|
||||
@ -228,6 +235,14 @@ inline LoadJobSet getGoals(const LoadTaskPtrs & tasks)
|
||||
return result;
|
||||
}
|
||||
|
||||
inline LoadJobSet getGoalsOr(const LoadTaskPtrs & tasks, const LoadJobSet & alternative)
|
||||
{
|
||||
LoadJobSet result;
|
||||
for (const auto & task : tasks)
|
||||
result.insert(task->goals().begin(), task->goals().end());
|
||||
return result.empty() ? alternative : result;
|
||||
}
|
||||
|
||||
inline LoadJobSet joinJobs(const LoadJobSet & jobs1, const LoadJobSet & jobs2)
|
||||
{
|
||||
LoadJobSet result;
|
||||
@ -251,100 +266,117 @@ inline LoadTaskPtrs joinTasks(const LoadTaskPtrs & tasks1, const LoadTaskPtrs &
|
||||
return result;
|
||||
}
|
||||
|
||||
// `AsyncLoader` is a scheduler for DAG of `LoadJob`s. It tracks dependencies and priorities of jobs.
|
||||
// `AsyncLoader` is a scheduler for DAG of `LoadJob`s. It tracks job dependencies and priorities.
|
||||
// Basic usage example:
|
||||
// // Start async_loader with two thread pools (0=bg, 1=fg):
|
||||
// AsyncLoader async_loader({
|
||||
// {"BgPool", CurrentMetrics::AsyncLoaderThreads, CurrentMetrics::AsyncLoaderThreadsActive, .max_threads = 1, .priority = 0}
|
||||
// {"FgPool", CurrentMetrics::AsyncLoaderThreads, CurrentMetrics::AsyncLoaderThreadsActive, .max_threads = 2, .priority = 1}
|
||||
// });
|
||||
//
|
||||
// // Create and schedule a task consisting of three jobs. Job1 has no dependencies and is run first.
|
||||
// // Job2 and job3 depend on job1 and are run only after job1 completion.
|
||||
// auto job_func = [&] (const LoadJobPtr & self) {
|
||||
// LOG_TRACE(log, "Executing load job '{}' with priority '{}'", self->name, self->priority());
|
||||
// LOG_TRACE(log, "Executing load job '{}' in pool '{}'", self->name, async_loader->getPoolName(self->pool()));
|
||||
// };
|
||||
// auto job1 = makeLoadJob({}, "job1", job_func);
|
||||
// auto job2 = makeLoadJob({ job1 }, "job2", job_func);
|
||||
// auto job3 = makeLoadJob({ job1 }, "job3", job_func);
|
||||
// auto job1 = makeLoadJob({}, "job1", /* pool_id = */ 0, job_func);
|
||||
// auto job2 = makeLoadJob({ job1 }, "job2", /* pool_id = */ 0, job_func);
|
||||
// auto job3 = makeLoadJob({ job1 }, "job3", /* pool_id = */ 0, job_func);
|
||||
// auto task = makeLoadTask(async_loader, { job1, job2, job3 });
|
||||
// task.schedule();
|
||||
// Here we have created and scheduled a task consisting of three jobs. Job1 has no dependencies and is run first.
|
||||
// Job2 and job3 depend on job1 and are run only after job1 completion. Another thread may prioritize a job and wait for it:
|
||||
// async_loader->prioritize(job3, /* priority = */ 1); // higher priority jobs are run first, default priority is zero.
|
||||
//
|
||||
// // Another thread may prioritize a job by changing its pool and wait for it:
|
||||
// async_loader->prioritize(job3, /* pool_id = */ 1); // higher priority jobs are run first, default priority is zero.
|
||||
// job3->wait(); // blocks until job completion or cancellation and rethrow an exception (if any)
|
||||
//
|
||||
// AsyncLoader tracks state of all scheduled jobs. Job lifecycle is the following:
|
||||
// 1) Job is constructed with PENDING status and initial priority. The job is placed into a task.
|
||||
// 2) The task is scheduled with all its jobs and their dependencies. A scheduled job may be ready (i.e. have all its dependencies finished) or blocked.
|
||||
// 3a) When all dependencies are successfully executed, the job became ready. A ready job is enqueued into the ready queue.
|
||||
// Every job has a pool associated with it. AsyncLoader starts every job in its thread pool.
|
||||
// Each pool has a constant priority and a mutable maximum number of threads.
|
||||
// Higher priority (greater `pool.priority` value) jobs are run first.
|
||||
// No job with lower priority is started while there is at least one higher priority job ready or running.
|
||||
//
|
||||
// Job priority can be elevated (but cannot be lowered)
|
||||
// (a) if either it has a dependent job with higher priority:
|
||||
// in this case the priority and the pool of a dependent job is inherited during `schedule()` call;
|
||||
// (b) or job was explicitly prioritized by `prioritize(job, higher_priority_pool)` call:
|
||||
// this also leads to a priority inheritance for all the dependencies.
|
||||
// Value stored in load job `pool_id` field is atomic and can be changed even during job execution.
|
||||
// Job is, of course, not moved from its initial thread pool, but it should use `self->pool()` for
|
||||
// all new jobs it create to avoid priority inversion.
|
||||
//
|
||||
// === IMPLEMENTATION DETAILS ===
|
||||
// All possible states and statuses of a job:
|
||||
// .---------- scheduled ----------.
|
||||
// ctor --> assigned --> blocked --> ready --> executing --> finished ------> removed --> dtor
|
||||
// STATUS: '------------------ PENDING -----------------' '-- OK|FAILED|CANCELED --'
|
||||
//
|
||||
// AsyncLoader tracks state of all scheduled and finished jobs. Job lifecycle is the following:
|
||||
// 1) A job is constructed with PENDING status and assigned to a pool. The job is placed into a task.
|
||||
// 2) The task is scheduled with all its jobs and their dependencies. A scheduled job may be ready, blocked (and later executing).
|
||||
// 3a) When all dependencies are successfully finished, the job became ready. A ready job is enqueued into the ready queue of its pool.
|
||||
// 3b) If at least one of the job dependencies is failed or canceled, then this job is canceled (with all it's dependent jobs as well).
|
||||
// On cancellation an ASYNC_LOAD_CANCELED exception is generated and saved inside LoadJob object. The job status is changed to CANCELED.
|
||||
// Exception is rethrown by any existing or new `wait()` call. The job is moved to the set of the finished jobs.
|
||||
// 4) The scheduled pending ready job starts execution by a worker. The job is dequeued. Callback `job_func` is called.
|
||||
// Status of an executing job is PENDING. And it is still considered as a scheduled job by AsyncLoader.
|
||||
// Note that `job_func` of a CANCELED job is never executed.
|
||||
// 4) The ready job starts execution by a worker. The job is dequeued. Callback `job_func` is called.
|
||||
// Status of an executing job is PENDING. Note that `job_func` of a CANCELED job is never executed.
|
||||
// 5a) On successful execution the job status is changed to OK and all existing and new `wait()` calls finish w/o exceptions.
|
||||
// 5b) Any exception thrown out of `job_func` is wrapped into an ASYNC_LOAD_FAILED exception and saved inside LoadJob.
|
||||
// The job status is changed to FAILED. All the dependent jobs are canceled. The exception is rethrown from all existing and new `wait()` calls.
|
||||
// 6) The job is no longer considered as scheduled and is instead moved to the finished jobs set. This is just for introspection of the finished jobs.
|
||||
// 7) The task containing this job is destructed or `remove()` is explicitly called. The job is removed from the finished job set.
|
||||
// 8) The job is destructed.
|
||||
//
|
||||
// Every job has a priority associated with it. AsyncLoader runs higher priority (greater `priority` value) jobs first. Job priority can be elevated
|
||||
// (a) if either it has a dependent job with higher priority (in this case priority of a dependent job is inherited);
|
||||
// (b) or job was explicitly prioritized by `prioritize(job, higher_priority)` call (this also leads to a priority inheritance for all the dependencies).
|
||||
// Note that to avoid priority inversion `job_func` should use `self->priority()` to schedule new jobs in AsyncLoader or any other pool.
|
||||
// Value stored in load job priority field is atomic and can be increased even during job execution.
|
||||
//
|
||||
// When a task is scheduled it can contain dependencies on previously scheduled jobs. These jobs can have any status. If job A being scheduled depends on
|
||||
// another job B that is not yet scheduled, then job B will also be scheduled (even if the task does not contain it).
|
||||
class AsyncLoader : private boost::noncopyable
|
||||
{
|
||||
private:
|
||||
// Key of a pending job in the ready queue.
|
||||
struct ReadyKey
|
||||
// Thread pool for job execution.
|
||||
// Pools control the following aspects of job execution:
|
||||
// 1) Concurrency: Amount of concurrently executing jobs in a pool is `max_threads`.
|
||||
// 2) Priority: As long as there is executing worker with higher priority, workers with lower priorities are not started
|
||||
// (although, they can finish last job started before higher priority jobs appeared)
|
||||
struct Pool
|
||||
{
|
||||
ssize_t priority; // Ascending order
|
||||
ssize_t initial_priority; // Ascending order
|
||||
UInt64 ready_seqno; // Descending order
|
||||
const String name;
|
||||
const ssize_t priority;
|
||||
std::unique_ptr<ThreadPool> thread_pool; // NOTE: we avoid using a `ThreadPool` queue to be able to move jobs between pools.
|
||||
std::map<UInt64, LoadJobPtr> ready_queue; // FIFO queue of jobs to be executed in this pool. Map is used for faster erasing. Key is `ready_seqno`
|
||||
size_t max_threads; // Max number of workers to be spawn
|
||||
size_t workers = 0; // Number of currently execution workers
|
||||
|
||||
bool operator<(const ReadyKey & rhs) const
|
||||
{
|
||||
if (priority > rhs.priority)
|
||||
return true;
|
||||
if (priority < rhs.priority)
|
||||
return false;
|
||||
if (initial_priority > rhs.initial_priority)
|
||||
return true;
|
||||
if (initial_priority < rhs.initial_priority)
|
||||
return false;
|
||||
return ready_seqno < rhs.ready_seqno;
|
||||
}
|
||||
bool isActive() const { return workers > 0 || !ready_queue.empty(); }
|
||||
};
|
||||
|
||||
// Scheduling information for a pending job.
|
||||
struct Info
|
||||
{
|
||||
ssize_t initial_priority = 0; // Initial priority passed into schedule().
|
||||
ssize_t priority = 0; // Elevated priority, due to priority inheritance or prioritize().
|
||||
size_t dependencies_left = 0; // Current number of dependencies on pending jobs.
|
||||
UInt64 ready_seqno = 0; // Zero means that job is not in ready queue.
|
||||
LoadJobSet dependent_jobs; // Set of jobs dependent on this job.
|
||||
|
||||
// Three independent states of a non-finished job.
|
||||
bool is_blocked() const { return dependencies_left > 0; }
|
||||
bool is_ready() const { return dependencies_left == 0 && ready_seqno > 0; }
|
||||
bool is_executing() const { return dependencies_left == 0 && ready_seqno == 0; }
|
||||
|
||||
// Get key of a ready job
|
||||
ReadyKey key() const
|
||||
{
|
||||
return {.priority = priority, .initial_priority = initial_priority, .ready_seqno = ready_seqno};
|
||||
}
|
||||
// Three independent states of a scheduled job.
|
||||
bool isBlocked() const { return dependencies_left > 0; }
|
||||
bool isReady() const { return dependencies_left == 0 && ready_seqno > 0; }
|
||||
bool isExecuting() const { return dependencies_left == 0 && ready_seqno == 0; }
|
||||
};
|
||||
|
||||
public:
|
||||
using Metric = CurrentMetrics::Metric;
|
||||
|
||||
AsyncLoader(Metric metric_threads, Metric metric_active_threads, size_t max_threads_, bool log_failures_, bool log_progress_);
|
||||
// Helper struct for AsyncLoader construction
|
||||
struct PoolInitializer
|
||||
{
|
||||
String name;
|
||||
Metric metric_threads;
|
||||
Metric metric_active_threads;
|
||||
size_t max_threads;
|
||||
ssize_t priority;
|
||||
};
|
||||
|
||||
AsyncLoader(std::vector<PoolInitializer> pool_initializers, bool log_failures_, bool log_progress_);
|
||||
|
||||
// Stops AsyncLoader before destruction
|
||||
// WARNING: all tasks instances should be destructed before associated AsyncLoader.
|
||||
~AsyncLoader();
|
||||
|
||||
// Start workers to execute scheduled load jobs.
|
||||
// Start workers to execute scheduled load jobs. Note that AsyncLoader is constructed as already started.
|
||||
void start();
|
||||
|
||||
// Wait for all load jobs to finish, including all new jobs. So at first take care to stop adding new jobs.
|
||||
@ -356,28 +388,32 @@ public:
|
||||
// - or canceled using ~Task() or remove() later.
|
||||
void stop();
|
||||
|
||||
// Schedule all jobs of given `task` and their dependencies (if any, not scheduled yet).
|
||||
// Higher priority jobs (with greater `job->priority()` value) are executed earlier.
|
||||
// All dependencies of a scheduled job inherit its priority if it is higher. This way higher priority job
|
||||
// never wait for (blocked by) lower priority jobs. No priority inversion is possible.
|
||||
// Schedule all jobs of given `task` and their dependencies (even if they are not in task).
|
||||
// All dependencies of a scheduled job inherit its pool if it has higher priority. This way higher priority job
|
||||
// never waits for (blocked by) lower priority jobs. No priority inversion is possible.
|
||||
// Idempotent: multiple schedule() calls for the same job are no-op.
|
||||
// Note that `task` destructor ensures that all its jobs are finished (OK, FAILED or CANCELED)
|
||||
// and are removed from AsyncLoader, so it is thread-safe to destroy them.
|
||||
void schedule(LoadTask & task);
|
||||
void schedule(const LoadTaskPtr & task);
|
||||
|
||||
// Schedule all tasks atomically. To ensure only highest priority jobs among all tasks are run first.
|
||||
void schedule(const std::vector<LoadTaskPtr> & tasks);
|
||||
void schedule(const LoadTaskPtrs & tasks);
|
||||
|
||||
// Increase priority of a job and all its dependencies recursively.
|
||||
void prioritize(const LoadJobPtr & job, ssize_t new_priority);
|
||||
// Jobs from higher (than `new_pool`) priority pools are not changed.
|
||||
void prioritize(const LoadJobPtr & job, size_t new_pool);
|
||||
|
||||
// Remove finished jobs, cancel scheduled jobs, wait for executing jobs to finish and remove them.
|
||||
void remove(const LoadJobSet & jobs);
|
||||
|
||||
// Increase or decrease maximum number of simultaneously executing jobs.
|
||||
void setMaxThreads(size_t value);
|
||||
// Increase or decrease maximum number of simultaneously executing jobs in `pool`.
|
||||
void setMaxThreads(size_t pool, size_t value);
|
||||
|
||||
size_t getMaxThreads(size_t pool) const;
|
||||
const String & getPoolName(size_t pool) const;
|
||||
ssize_t getPoolPriority(size_t pool) const;
|
||||
|
||||
size_t getMaxThreads() const;
|
||||
size_t getScheduledJobCount() const;
|
||||
|
||||
// Helper class for introspection
|
||||
@ -385,11 +421,10 @@ public:
|
||||
{
|
||||
LoadJobPtr job;
|
||||
size_t dependencies_left = 0;
|
||||
bool is_executing = false;
|
||||
UInt64 ready_seqno = 0;
|
||||
bool is_blocked = false;
|
||||
bool is_ready = false;
|
||||
std::optional<ssize_t> initial_priority;
|
||||
std::optional<UInt64> ready_seqno;
|
||||
bool is_executing = false;
|
||||
};
|
||||
|
||||
// For introspection and debug only, see `system.async_loader` table
|
||||
@ -398,42 +433,32 @@ public:
|
||||
private:
|
||||
void checkCycle(const LoadJobSet & jobs, std::unique_lock<std::mutex> & lock);
|
||||
String checkCycleImpl(const LoadJobPtr & job, LoadJobSet & left, LoadJobSet & visited, std::unique_lock<std::mutex> & lock);
|
||||
void finish(std::unique_lock<std::mutex> & lock, const LoadJobPtr & job, LoadStatus status, std::exception_ptr exception_from_job = {});
|
||||
void finish(const LoadJobPtr & job, LoadStatus status, std::exception_ptr exception_from_job, std::unique_lock<std::mutex> & lock);
|
||||
void scheduleImpl(const LoadJobSet & input_jobs);
|
||||
void gatherNotScheduled(const LoadJobPtr & job, LoadJobSet & jobs, std::unique_lock<std::mutex> & lock);
|
||||
void prioritize(const LoadJobPtr & job, ssize_t new_priority, std::unique_lock<std::mutex> & lock);
|
||||
void prioritize(const LoadJobPtr & job, size_t new_pool_id, std::unique_lock<std::mutex> & lock);
|
||||
void enqueue(Info & info, const LoadJobPtr & job, std::unique_lock<std::mutex> & lock);
|
||||
void spawn(std::unique_lock<std::mutex> &);
|
||||
void worker();
|
||||
bool canSpawnWorker(Pool & pool, std::unique_lock<std::mutex> &);
|
||||
bool canWorkerLive(Pool & pool, std::unique_lock<std::mutex> &);
|
||||
void updateCurrentPriorityAndSpawn(std::unique_lock<std::mutex> &);
|
||||
void spawn(Pool & pool, std::unique_lock<std::mutex> &);
|
||||
void worker(Pool & pool);
|
||||
|
||||
// Logging
|
||||
const bool log_failures; // Worker should log all exceptions caught from job functions.
|
||||
const bool log_progress; // Periodically log total progress
|
||||
Poco::Logger * log;
|
||||
std::chrono::system_clock::time_point busy_period_start_time;
|
||||
AtomicStopwatch stopwatch;
|
||||
size_t old_jobs = 0; // Number of jobs that were finished in previous busy period (for correct progress indication)
|
||||
|
||||
mutable std::mutex mutex; // Guards all the fields below.
|
||||
bool is_running = false;
|
||||
|
||||
// Full set of scheduled pending jobs along with scheduling info.
|
||||
std::unordered_map<LoadJobPtr, Info> scheduled_jobs;
|
||||
|
||||
// Subset of scheduled pending non-blocked jobs (waiting for a worker to be executed).
|
||||
// Represent a queue of jobs in order of decreasing priority and FIFO for jobs with equal priorities.
|
||||
std::map<ReadyKey, LoadJobPtr> ready_queue;
|
||||
|
||||
// Set of finished jobs (for introspection only, until jobs are removed).
|
||||
LoadJobSet finished_jobs;
|
||||
|
||||
// Increasing counter for `ReadyKey` assignment (to preserve FIFO order of the jobs with equal priorities).
|
||||
UInt64 last_ready_seqno = 0;
|
||||
|
||||
// For executing jobs. Note that we avoid using an internal queue of the pool to be able to prioritize jobs.
|
||||
size_t max_threads;
|
||||
size_t workers = 0;
|
||||
ThreadPool pool;
|
||||
bool is_running = true;
|
||||
std::optional<ssize_t> current_priority; // highest priority among active pools
|
||||
UInt64 last_ready_seqno = 0; // Increasing counter for ready queue keys.
|
||||
std::unordered_map<LoadJobPtr, Info> scheduled_jobs; // Full set of scheduled pending jobs along with scheduling info.
|
||||
std::vector<Pool> pools; // Thread pools for job execution and ready queues
|
||||
LoadJobSet finished_jobs; // Set of finished jobs (for introspection only, until jobs are removed).
|
||||
AtomicStopwatch stopwatch; // For progress indication
|
||||
size_t old_jobs = 0; // Number of jobs that were finished in previous busy period (for correct progress indication)
|
||||
std::chrono::system_clock::time_point busy_period_start_time;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -30,6 +30,11 @@ namespace DB::ErrorCodes
|
||||
extern const int ASYNC_LOAD_CANCELED;
|
||||
}
|
||||
|
||||
struct Initializer {
|
||||
size_t max_threads = 1;
|
||||
ssize_t priority = 0;
|
||||
};
|
||||
|
||||
struct AsyncLoaderTest
|
||||
{
|
||||
AsyncLoader loader;
|
||||
@ -37,10 +42,34 @@ struct AsyncLoaderTest
|
||||
std::mutex rng_mutex;
|
||||
pcg64 rng{randomSeed()};
|
||||
|
||||
explicit AsyncLoaderTest(std::vector<Initializer> initializers)
|
||||
: loader(getPoolInitializers(initializers), /* log_failures = */ false, /* log_progress = */ false)
|
||||
{
|
||||
loader.stop(); // All tests call `start()` manually to better control ordering
|
||||
}
|
||||
|
||||
explicit AsyncLoaderTest(size_t max_threads = 1)
|
||||
: loader(CurrentMetrics::TablesLoaderThreads, CurrentMetrics::TablesLoaderThreadsActive, max_threads, /* log_failures = */ false, /* log_progress = */ false)
|
||||
: AsyncLoaderTest({{.max_threads = max_threads}})
|
||||
{}
|
||||
|
||||
std::vector<AsyncLoader::PoolInitializer> getPoolInitializers(std::vector<Initializer> initializers)
|
||||
{
|
||||
std::vector<AsyncLoader::PoolInitializer> result;
|
||||
size_t pool_id = 0;
|
||||
for (auto & desc : initializers)
|
||||
{
|
||||
result.push_back({
|
||||
.name = fmt::format("Pool{}", pool_id),
|
||||
.metric_threads = CurrentMetrics::TablesLoaderThreads,
|
||||
.metric_active_threads = CurrentMetrics::TablesLoaderThreadsActive,
|
||||
.max_threads = desc.max_threads,
|
||||
.priority = desc.priority
|
||||
});
|
||||
pool_id++;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
T randomInt(T from, T to)
|
||||
{
|
||||
@ -114,16 +143,19 @@ struct AsyncLoaderTest
|
||||
|
||||
TEST(AsyncLoader, Smoke)
|
||||
{
|
||||
AsyncLoaderTest t(2);
|
||||
AsyncLoaderTest t({
|
||||
{.max_threads = 2, .priority = 0},
|
||||
{.max_threads = 2, .priority = -1},
|
||||
});
|
||||
|
||||
static constexpr ssize_t low_priority = -1;
|
||||
static constexpr ssize_t low_priority_pool = 1;
|
||||
|
||||
std::atomic<size_t> jobs_done{0};
|
||||
std::atomic<size_t> low_priority_jobs_done{0};
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr & self) {
|
||||
jobs_done++;
|
||||
if (self->priority() == low_priority)
|
||||
if (self->pool() == low_priority_pool)
|
||||
low_priority_jobs_done++;
|
||||
};
|
||||
|
||||
@ -135,7 +167,7 @@ TEST(AsyncLoader, Smoke)
|
||||
auto job3 = makeLoadJob({ job2 }, "job3", job_func);
|
||||
auto job4 = makeLoadJob({ job2 }, "job4", job_func);
|
||||
auto task2 = t.schedule({ job3, job4 });
|
||||
auto job5 = makeLoadJob({ job3, job4 }, low_priority, "job5", job_func);
|
||||
auto job5 = makeLoadJob({ job3, job4 }, low_priority_pool, "job5", job_func);
|
||||
task2->merge(t.schedule({ job5 }));
|
||||
|
||||
std::thread waiter_thread([=] { job5->wait(); });
|
||||
@ -536,7 +568,7 @@ TEST(AsyncLoader, TestOverload)
|
||||
AsyncLoaderTest t(3);
|
||||
t.loader.start();
|
||||
|
||||
size_t max_threads = t.loader.getMaxThreads();
|
||||
size_t max_threads = t.loader.getMaxThreads(/* pool = */ 0);
|
||||
std::atomic<int> executing{0};
|
||||
|
||||
for (int concurrency = 4; concurrency <= 8; concurrency++)
|
||||
@ -562,13 +594,24 @@ TEST(AsyncLoader, TestOverload)
|
||||
|
||||
TEST(AsyncLoader, StaticPriorities)
|
||||
{
|
||||
AsyncLoaderTest t(1);
|
||||
AsyncLoaderTest t({
|
||||
{.max_threads = 1, .priority = 0},
|
||||
{.max_threads = 1, .priority = 1},
|
||||
{.max_threads = 1, .priority = 2},
|
||||
{.max_threads = 1, .priority = 3},
|
||||
{.max_threads = 1, .priority = 4},
|
||||
{.max_threads = 1, .priority = 5},
|
||||
{.max_threads = 1, .priority = 6},
|
||||
{.max_threads = 1, .priority = 7},
|
||||
{.max_threads = 1, .priority = 8},
|
||||
{.max_threads = 1, .priority = 9},
|
||||
});
|
||||
|
||||
std::string schedule;
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr & self)
|
||||
{
|
||||
schedule += fmt::format("{}{}", self->name, self->priority());
|
||||
schedule += fmt::format("{}{}", self->name, self->pool());
|
||||
};
|
||||
|
||||
std::vector<LoadJobPtr> jobs;
|
||||
@ -588,21 +631,110 @@ TEST(AsyncLoader, StaticPriorities)
|
||||
ASSERT_EQ(schedule, "A9E9D9F9G9H9C4B3");
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, SimplePrioritization)
|
||||
{
|
||||
AsyncLoaderTest t({
|
||||
{.max_threads = 1, .priority = 0},
|
||||
{.max_threads = 1, .priority = 1},
|
||||
{.max_threads = 1, .priority = 2},
|
||||
});
|
||||
|
||||
t.loader.start();
|
||||
|
||||
std::atomic<int> executed{0}; // Number of previously executed jobs (to test execution order)
|
||||
LoadJobPtr job_to_prioritize;
|
||||
|
||||
auto job_func_A_booster = [&] (const LoadJobPtr &)
|
||||
{
|
||||
ASSERT_EQ(executed++, 0);
|
||||
t.loader.prioritize(job_to_prioritize, 2);
|
||||
};
|
||||
|
||||
auto job_func_B_tester = [&] (const LoadJobPtr &)
|
||||
{
|
||||
ASSERT_EQ(executed++, 2);
|
||||
};
|
||||
|
||||
auto job_func_C_boosted = [&] (const LoadJobPtr &)
|
||||
{
|
||||
ASSERT_EQ(executed++, 1);
|
||||
};
|
||||
|
||||
std::vector<LoadJobPtr> jobs;
|
||||
jobs.push_back(makeLoadJob({}, 1, "A", job_func_A_booster)); // 0
|
||||
jobs.push_back(makeLoadJob({jobs[0]}, 1, "B", job_func_B_tester)); // 1
|
||||
jobs.push_back(makeLoadJob({}, 0, "C", job_func_C_boosted)); // 2
|
||||
auto task = makeLoadTask(t.loader, { jobs.begin(), jobs.end() });
|
||||
|
||||
job_to_prioritize = jobs[2]; // C
|
||||
|
||||
scheduleAndWaitLoadAll(task);
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, DynamicPriorities)
|
||||
{
|
||||
AsyncLoaderTest t(1);
|
||||
AsyncLoaderTest t({
|
||||
{.max_threads = 1, .priority = 0},
|
||||
{.max_threads = 1, .priority = 1},
|
||||
{.max_threads = 1, .priority = 2},
|
||||
{.max_threads = 1, .priority = 3},
|
||||
{.max_threads = 1, .priority = 4},
|
||||
{.max_threads = 1, .priority = 5},
|
||||
{.max_threads = 1, .priority = 6},
|
||||
{.max_threads = 1, .priority = 7},
|
||||
{.max_threads = 1, .priority = 8},
|
||||
{.max_threads = 1, .priority = 9},
|
||||
});
|
||||
|
||||
for (bool prioritize : {false, true})
|
||||
{
|
||||
// Although all pools have max_threads=1, workers from different pools can run simultaneously just after `prioritize()` call
|
||||
std::barrier sync(2);
|
||||
bool wait_sync = prioritize;
|
||||
std::mutex schedule_mutex;
|
||||
std::string schedule;
|
||||
|
||||
LoadJobPtr job_to_prioritize;
|
||||
|
||||
// Order of execution of jobs D and E after prioritization is undefined, because it depend on `ready_seqno`
|
||||
// (Which depends on initial `schedule()` order, which in turn depend on `std::unordered_map` order)
|
||||
// So we have to obtain `ready_seqno` to be sure.
|
||||
UInt64 ready_seqno_D = 0;
|
||||
UInt64 ready_seqno_E = 0;
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr & self)
|
||||
{
|
||||
{
|
||||
std::unique_lock lock{schedule_mutex};
|
||||
schedule += fmt::format("{}{}", self->name, self->executionPool());
|
||||
}
|
||||
|
||||
if (prioritize && self->name == "C")
|
||||
t.loader.prioritize(job_to_prioritize, 9); // dynamic prioritization
|
||||
schedule += fmt::format("{}{}", self->name, self->priority());
|
||||
{
|
||||
for (const auto & state : t.loader.getJobStates())
|
||||
{
|
||||
if (state.job->name == "D")
|
||||
ready_seqno_D = state.ready_seqno;
|
||||
if (state.job->name == "E")
|
||||
ready_seqno_E = state.ready_seqno;
|
||||
}
|
||||
|
||||
// Jobs D and E should be enqueued at the moment
|
||||
ASSERT_LT(0, ready_seqno_D);
|
||||
ASSERT_LT(0, ready_seqno_E);
|
||||
|
||||
// Dynamic prioritization G0 -> G9
|
||||
// Note that it will spawn concurrent worker in higher priority pool
|
||||
t.loader.prioritize(job_to_prioritize, 9);
|
||||
|
||||
sync.arrive_and_wait(); // (A) wait for higher priority worker (B) to test they can be concurrent
|
||||
}
|
||||
|
||||
if (wait_sync && (self->name == "D" || self->name == "E"))
|
||||
{
|
||||
wait_sync = false;
|
||||
sync.arrive_and_wait(); // (B)
|
||||
}
|
||||
};
|
||||
|
||||
// Job DAG with initial priorities. During execution of C4, job G0 priority is increased to G9, postponing B3 job executing.
|
||||
@ -624,14 +756,19 @@ TEST(AsyncLoader, DynamicPriorities)
|
||||
jobs.push_back(makeLoadJob({ jobs[6] }, 0, "H", job_func)); // 7
|
||||
auto task = t.schedule({ jobs.begin(), jobs.end() });
|
||||
|
||||
job_to_prioritize = jobs[6];
|
||||
job_to_prioritize = jobs[6]; // G
|
||||
|
||||
t.loader.start();
|
||||
t.loader.wait();
|
||||
t.loader.stop();
|
||||
|
||||
if (prioritize)
|
||||
ASSERT_EQ(schedule, "A4C4E9D9F9G9B3H0");
|
||||
{
|
||||
if (ready_seqno_D < ready_seqno_E)
|
||||
ASSERT_EQ(schedule, "A4C4D9E9F9G9B3H0");
|
||||
else
|
||||
ASSERT_EQ(schedule, "A4C4E9D9F9G9B3H0");
|
||||
}
|
||||
else
|
||||
ASSERT_EQ(schedule, "A4C4B3E2D1F0G0H0");
|
||||
}
|
||||
@ -742,8 +879,64 @@ TEST(AsyncLoader, SetMaxThreads)
|
||||
syncs[idx]->arrive_and_wait(); // (A)
|
||||
sync_index++;
|
||||
if (sync_index < syncs.size())
|
||||
t.loader.setMaxThreads(max_threads_values[sync_index]);
|
||||
t.loader.setMaxThreads(/* pool = */ 0, max_threads_values[sync_index]);
|
||||
syncs[idx]->arrive_and_wait(); // (B) this sync point is required to allow `executing` value to go back down to zero after we change number of workers
|
||||
}
|
||||
t.loader.wait();
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, DynamicPools)
|
||||
{
|
||||
const size_t max_threads[] { 2, 10 };
|
||||
const int jobs_in_chain = 16;
|
||||
AsyncLoaderTest t({
|
||||
{.max_threads = max_threads[0], .priority = 0},
|
||||
{.max_threads = max_threads[1], .priority = 1},
|
||||
});
|
||||
|
||||
t.loader.start();
|
||||
|
||||
std::atomic<size_t> executing[2] { 0, 0 }; // Number of currently executing jobs per pool
|
||||
|
||||
for (int concurrency = 1; concurrency <= 12; concurrency++)
|
||||
{
|
||||
std::atomic<bool> boosted{false}; // Visible concurrency was increased
|
||||
std::atomic<int> left{concurrency * jobs_in_chain / 2}; // Number of jobs to start before `prioritize()` call
|
||||
|
||||
LoadJobSet jobs_to_prioritize;
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr & self)
|
||||
{
|
||||
auto pool_id = self->executionPool();
|
||||
executing[pool_id]++;
|
||||
if (executing[pool_id] > max_threads[0])
|
||||
boosted = true;
|
||||
ASSERT_LE(executing[pool_id], max_threads[pool_id]);
|
||||
|
||||
// Dynamic prioritization
|
||||
if (--left == 0)
|
||||
{
|
||||
for (const auto & job : jobs_to_prioritize)
|
||||
t.loader.prioritize(job, 1);
|
||||
}
|
||||
|
||||
t.randomSleepUs(100, 200, 100);
|
||||
|
||||
ASSERT_LE(executing[pool_id], max_threads[pool_id]);
|
||||
executing[pool_id]--;
|
||||
};
|
||||
|
||||
std::vector<LoadTaskPtr> tasks;
|
||||
tasks.reserve(concurrency);
|
||||
for (int i = 0; i < concurrency; i++)
|
||||
tasks.push_back(makeLoadTask(t.loader, t.chainJobSet(jobs_in_chain, job_func)));
|
||||
jobs_to_prioritize = getGoals(tasks); // All jobs
|
||||
scheduleAndWaitLoadAll(tasks);
|
||||
|
||||
ASSERT_EQ(executing[0], 0);
|
||||
ASSERT_EQ(executing[1], 0);
|
||||
ASSERT_EQ(boosted, concurrency > 2);
|
||||
boosted = false;
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user