mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge pull request #48923 from ClickHouse/async-loader
Add AsyncLoader with dependency tracking and runtime prioritization
This commit is contained in:
commit
8f20085d9a
640
src/Common/AsyncLoader.cpp
Normal file
640
src/Common/AsyncLoader.cpp
Normal file
@ -0,0 +1,640 @@
|
||||
#include <Common/AsyncLoader.h>
|
||||
|
||||
#include <base/defines.h>
|
||||
#include <Common/ErrorCodes.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/noexcept_scope.h>
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ASYNC_LOAD_CYCLE;
|
||||
extern const int ASYNC_LOAD_FAILED;
|
||||
extern const int ASYNC_LOAD_CANCELED;
|
||||
}
|
||||
|
||||
static constexpr size_t PRINT_MESSAGE_EACH_N_OBJECTS = 256;
|
||||
static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5;
|
||||
|
||||
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch)
|
||||
{
|
||||
if (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
|
||||
{
|
||||
LOG_INFO(log, "Processed: {}%", processed * 100.0 / total);
|
||||
watch.restart();
|
||||
}
|
||||
}
|
||||
|
||||
LoadStatus LoadJob::status() const
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
return load_status;
|
||||
}
|
||||
|
||||
std::exception_ptr LoadJob::exception() const
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
return load_exception;
|
||||
}
|
||||
|
||||
ssize_t LoadJob::priority() const
|
||||
{
|
||||
return load_priority;
|
||||
}
|
||||
|
||||
void LoadJob::wait() const
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
waiters++;
|
||||
finished.wait(lock, [this] { return load_status != LoadStatus::PENDING; });
|
||||
waiters--;
|
||||
if (load_exception)
|
||||
std::rethrow_exception(load_exception);
|
||||
}
|
||||
|
||||
void LoadJob::waitNoThrow() const noexcept
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
waiters++;
|
||||
finished.wait(lock, [this] { return load_status != LoadStatus::PENDING; });
|
||||
waiters--;
|
||||
}
|
||||
|
||||
size_t LoadJob::waitersCount() const
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
return waiters;
|
||||
}
|
||||
|
||||
void LoadJob::ok()
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
load_status = LoadStatus::OK;
|
||||
finish();
|
||||
}
|
||||
|
||||
void LoadJob::failed(const std::exception_ptr & ptr)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
load_status = LoadStatus::FAILED;
|
||||
load_exception = ptr;
|
||||
finish();
|
||||
}
|
||||
|
||||
void LoadJob::canceled(const std::exception_ptr & ptr)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
load_status = LoadStatus::CANCELED;
|
||||
load_exception = ptr;
|
||||
finish();
|
||||
}
|
||||
|
||||
void LoadJob::finish()
|
||||
{
|
||||
func = {}; // To ensure job function is destructed before `AsyncLoader::wait()` and `LoadJob::wait()` return
|
||||
finish_time = std::chrono::system_clock::now();
|
||||
if (waiters > 0)
|
||||
finished.notify_all();
|
||||
}
|
||||
|
||||
void LoadJob::scheduled()
|
||||
{
|
||||
schedule_time = std::chrono::system_clock::now();
|
||||
}
|
||||
|
||||
void LoadJob::enqueued()
|
||||
{
|
||||
if (enqueue_time.load() == TimePoint{}) // Do not rewrite in case of requeue
|
||||
enqueue_time = std::chrono::system_clock::now();
|
||||
}
|
||||
|
||||
void LoadJob::execute(const LoadJobPtr & self)
|
||||
{
|
||||
start_time = std::chrono::system_clock::now();
|
||||
func(self);
|
||||
}
|
||||
|
||||
|
||||
LoadTask::LoadTask(AsyncLoader & loader_, LoadJobSet && jobs_, LoadJobSet && goal_jobs_)
|
||||
: loader(loader_)
|
||||
, jobs(std::move(jobs_))
|
||||
, goal_jobs(std::move(goal_jobs_))
|
||||
{}
|
||||
|
||||
LoadTask::~LoadTask()
|
||||
{
|
||||
remove();
|
||||
}
|
||||
|
||||
void LoadTask::merge(const LoadTaskPtr & task)
|
||||
{
|
||||
chassert(&loader == &task->loader);
|
||||
jobs.merge(task->jobs);
|
||||
goal_jobs.merge(task->goal_jobs);
|
||||
}
|
||||
|
||||
void LoadTask::schedule()
|
||||
{
|
||||
loader.schedule(*this);
|
||||
}
|
||||
|
||||
void LoadTask::remove()
|
||||
{
|
||||
if (!jobs.empty())
|
||||
{
|
||||
loader.remove(jobs);
|
||||
jobs.clear();
|
||||
}
|
||||
}
|
||||
|
||||
void LoadTask::detach()
|
||||
{
|
||||
jobs.clear();
|
||||
}
|
||||
|
||||
AsyncLoader::AsyncLoader(Metric metric_threads, Metric metric_active_threads, size_t max_threads_, bool log_failures_, bool log_progress_)
|
||||
: log_failures(log_failures_)
|
||||
, log_progress(log_progress_)
|
||||
, log(&Poco::Logger::get("AsyncLoader"))
|
||||
, max_threads(max_threads_)
|
||||
, pool(metric_threads, metric_active_threads, max_threads)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
AsyncLoader::~AsyncLoader()
|
||||
{
|
||||
stop();
|
||||
}
|
||||
|
||||
void AsyncLoader::start()
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
is_running = true;
|
||||
for (size_t i = 0; workers < max_threads && i < ready_queue.size(); i++)
|
||||
spawn(lock);
|
||||
}
|
||||
|
||||
void AsyncLoader::wait()
|
||||
{
|
||||
pool.wait();
|
||||
}
|
||||
|
||||
void AsyncLoader::stop()
|
||||
{
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
is_running = false;
|
||||
// NOTE: there is no need to notify because workers never wait
|
||||
}
|
||||
pool.wait();
|
||||
}
|
||||
|
||||
void AsyncLoader::schedule(LoadTask & task)
|
||||
{
|
||||
chassert(this == &task.loader);
|
||||
scheduleImpl(task.jobs);
|
||||
}
|
||||
|
||||
void AsyncLoader::schedule(const LoadTaskPtr & task)
|
||||
{
|
||||
chassert(this == &task->loader);
|
||||
scheduleImpl(task->jobs);
|
||||
}
|
||||
|
||||
void AsyncLoader::schedule(const std::vector<LoadTaskPtr> & tasks)
|
||||
{
|
||||
LoadJobSet all_jobs;
|
||||
for (const auto & task : tasks)
|
||||
{
|
||||
chassert(this == &task->loader);
|
||||
all_jobs.insert(task->jobs.begin(), task->jobs.end());
|
||||
}
|
||||
scheduleImpl(all_jobs);
|
||||
}
|
||||
|
||||
void AsyncLoader::scheduleImpl(const LoadJobSet & input_jobs)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
|
||||
// Restart watches after idle period
|
||||
if (scheduled_jobs.empty())
|
||||
{
|
||||
busy_period_start_time = std::chrono::system_clock::now();
|
||||
stopwatch.restart();
|
||||
old_jobs = finished_jobs.size();
|
||||
}
|
||||
|
||||
// Make set of jobs to schedule:
|
||||
// 1) exclude already scheduled or finished jobs
|
||||
// 2) include pending dependencies, that are not yet scheduled
|
||||
LoadJobSet jobs;
|
||||
for (const auto & job : input_jobs)
|
||||
gatherNotScheduled(job, jobs, lock);
|
||||
|
||||
// Ensure scheduled_jobs graph will have no cycles. The only way to get a cycle is to add a cycle, assuming old jobs cannot reference new ones.
|
||||
checkCycle(jobs, lock);
|
||||
|
||||
// We do not want any exception to be throws after this point, because the following code is not exception-safe
|
||||
DENY_ALLOCATIONS_IN_SCOPE;
|
||||
|
||||
// Schedule all incoming jobs
|
||||
for (const auto & job : jobs)
|
||||
{
|
||||
NOEXCEPT_SCOPE({
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
scheduled_jobs.emplace(job, Info{.initial_priority = job->load_priority, .priority = job->load_priority});
|
||||
job->scheduled();
|
||||
});
|
||||
}
|
||||
|
||||
// Process dependencies on scheduled pending jobs
|
||||
for (const auto & job : jobs)
|
||||
{
|
||||
Info & info = scheduled_jobs.find(job)->second;
|
||||
for (const auto & dep : job->dependencies)
|
||||
{
|
||||
// Register every dependency on scheduled job with back-link to dependent job
|
||||
if (auto dep_info = scheduled_jobs.find(dep); dep_info != scheduled_jobs.end())
|
||||
{
|
||||
NOEXCEPT_SCOPE({
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
dep_info->second.dependent_jobs.insert(job);
|
||||
});
|
||||
info.dependencies_left++;
|
||||
|
||||
// Priority inheritance: prioritize deps to have at least given `priority` to avoid priority inversion
|
||||
prioritize(dep, info.priority, lock);
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue non-blocked jobs (w/o dependencies) to ready queue
|
||||
if (!info.is_blocked())
|
||||
enqueue(info, job, lock);
|
||||
}
|
||||
|
||||
// Process dependencies on other jobs. It is done in a separate pass to facilitate propagation of cancel signals (if any).
|
||||
for (const auto & job : jobs)
|
||||
{
|
||||
if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end())
|
||||
{
|
||||
for (const auto & dep : job->dependencies)
|
||||
{
|
||||
if (scheduled_jobs.contains(dep))
|
||||
continue; // Skip dependencies on scheduled pending jobs (already processed)
|
||||
LoadStatus dep_status = dep->status();
|
||||
if (dep_status == LoadStatus::OK)
|
||||
continue; // Dependency on already successfully finished job -- it's okay.
|
||||
|
||||
// Dependency on not scheduled pending job -- it's bad.
|
||||
// Probably, there is an error in `jobs` set, `gatherNotScheduled()` should have fixed it.
|
||||
chassert(dep_status != LoadStatus::PENDING);
|
||||
|
||||
if (dep_status == LoadStatus::FAILED || dep_status == LoadStatus::CANCELED)
|
||||
{
|
||||
// Dependency on already failed or canceled job -- it's okay. Cancel all dependent jobs.
|
||||
std::exception_ptr e;
|
||||
NOEXCEPT_SCOPE({
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
e = std::make_exception_ptr(Exception(ErrorCodes::ASYNC_LOAD_CANCELED,
|
||||
"Load job '{}' -> {}",
|
||||
job->name,
|
||||
getExceptionMessage(dep->exception(), /* with_stacktrace = */ false)));
|
||||
});
|
||||
finish(lock, job, LoadStatus::CANCELED, e);
|
||||
break; // This job is now finished, stop its dependencies processing
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Job was already canceled on previous iteration of this cycle -- skip
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncLoader::gatherNotScheduled(const LoadJobPtr & job, LoadJobSet & jobs, std::unique_lock<std::mutex> & lock)
|
||||
{
|
||||
if (job->status() == LoadStatus::PENDING && !scheduled_jobs.contains(job) && !jobs.contains(job))
|
||||
{
|
||||
jobs.insert(job);
|
||||
for (const auto & dep : job->dependencies)
|
||||
gatherNotScheduled(dep, jobs, lock);
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncLoader::prioritize(const LoadJobPtr & job, ssize_t new_priority)
|
||||
{
|
||||
if (!job)
|
||||
return;
|
||||
DENY_ALLOCATIONS_IN_SCOPE;
|
||||
std::unique_lock lock{mutex};
|
||||
prioritize(job, new_priority, lock);
|
||||
}
|
||||
|
||||
void AsyncLoader::remove(const LoadJobSet & jobs)
|
||||
{
|
||||
DENY_ALLOCATIONS_IN_SCOPE;
|
||||
std::unique_lock lock{mutex};
|
||||
// On the first pass:
|
||||
// - cancel all not executing jobs to avoid races
|
||||
// - do not wait executing jobs (otherwise, on unlock a worker could start executing a dependent job, that should be canceled)
|
||||
for (const auto & job : jobs)
|
||||
{
|
||||
if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end())
|
||||
{
|
||||
if (info->second.is_executing())
|
||||
continue; // Skip executing jobs on the first pass
|
||||
std::exception_ptr e;
|
||||
NOEXCEPT_SCOPE({
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
e = std::make_exception_ptr(Exception(ErrorCodes::ASYNC_LOAD_CANCELED, "Load job '{}' canceled", job->name));
|
||||
});
|
||||
finish(lock, job, LoadStatus::CANCELED, e);
|
||||
}
|
||||
}
|
||||
// On the second pass wait for executing jobs to finish
|
||||
for (const auto & job : jobs)
|
||||
{
|
||||
if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end())
|
||||
{
|
||||
// Job is currently executing
|
||||
chassert(info->second.is_executing());
|
||||
lock.unlock();
|
||||
job->waitNoThrow(); // Wait for job to finish
|
||||
lock.lock();
|
||||
}
|
||||
}
|
||||
// On the third pass all jobs are finished - remove them all
|
||||
// It is better to do it under one lock to avoid exposing intermediate states
|
||||
for (const auto & job : jobs)
|
||||
{
|
||||
size_t erased = finished_jobs.erase(job);
|
||||
if (old_jobs >= erased && job->finishTime() != LoadJob::TimePoint{} && job->finishTime() < busy_period_start_time)
|
||||
old_jobs -= erased;
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncLoader::setMaxThreads(size_t value)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
pool.setMaxThreads(value);
|
||||
pool.setMaxFreeThreads(value);
|
||||
pool.setQueueSize(value);
|
||||
max_threads = value;
|
||||
if (!is_running)
|
||||
return;
|
||||
for (size_t i = 0; workers < max_threads && i < ready_queue.size(); i++)
|
||||
spawn(lock);
|
||||
}
|
||||
|
||||
size_t AsyncLoader::getMaxThreads() const
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
return max_threads;
|
||||
}
|
||||
|
||||
size_t AsyncLoader::getScheduledJobCount() const
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
return scheduled_jobs.size();
|
||||
}
|
||||
|
||||
std::vector<AsyncLoader::JobState> AsyncLoader::getJobStates() const
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
std::multimap<String, JobState> states;
|
||||
for (const auto & [job, info] : scheduled_jobs)
|
||||
states.emplace(job->name, JobState{
|
||||
.job = job,
|
||||
.dependencies_left = info.dependencies_left,
|
||||
.is_executing = info.is_executing(),
|
||||
.is_blocked = info.is_blocked(),
|
||||
.is_ready = info.is_ready(),
|
||||
.initial_priority = info.initial_priority,
|
||||
.ready_seqno = last_ready_seqno
|
||||
});
|
||||
for (const auto & job : finished_jobs)
|
||||
states.emplace(job->name, JobState{.job = job});
|
||||
lock.unlock();
|
||||
std::vector<JobState> result;
|
||||
for (auto && [_, state] : states)
|
||||
result.emplace_back(std::move(state));
|
||||
return result;
|
||||
}
|
||||
|
||||
void AsyncLoader::checkCycle(const LoadJobSet & jobs, std::unique_lock<std::mutex> & lock)
|
||||
{
|
||||
LoadJobSet left = jobs;
|
||||
LoadJobSet visited;
|
||||
visited.reserve(left.size());
|
||||
while (!left.empty())
|
||||
{
|
||||
LoadJobPtr job = *left.begin();
|
||||
checkCycleImpl(job, left, visited, lock);
|
||||
}
|
||||
}
|
||||
|
||||
String AsyncLoader::checkCycleImpl(const LoadJobPtr & job, LoadJobSet & left, LoadJobSet & visited, std::unique_lock<std::mutex> & lock)
|
||||
{
|
||||
if (!left.contains(job))
|
||||
return {}; // Do not consider external dependencies and already processed jobs
|
||||
if (auto [_, inserted] = visited.insert(job); !inserted)
|
||||
{
|
||||
visited.erase(job); // Mark where cycle ends
|
||||
return job->name;
|
||||
}
|
||||
for (const auto & dep : job->dependencies)
|
||||
{
|
||||
if (auto chain = checkCycleImpl(dep, left, visited, lock); !chain.empty())
|
||||
{
|
||||
if (!visited.contains(job)) // Check for cycle end
|
||||
throw Exception(ErrorCodes::ASYNC_LOAD_CYCLE, "Load job dependency cycle detected: {} -> {}", job->name, chain);
|
||||
else
|
||||
return fmt::format("{} -> {}", job->name, chain); // chain is not a cycle yet -- continue building
|
||||
}
|
||||
}
|
||||
left.erase(job);
|
||||
return {};
|
||||
}
|
||||
|
||||
void AsyncLoader::finish(std::unique_lock<std::mutex> & lock, const LoadJobPtr & job, LoadStatus status, std::exception_ptr exception_from_job)
|
||||
{
|
||||
if (status == LoadStatus::OK)
|
||||
{
|
||||
// Notify waiters
|
||||
job->ok();
|
||||
|
||||
// Update dependent jobs and enqueue if ready
|
||||
chassert(scheduled_jobs.contains(job)); // Job was pending
|
||||
for (const auto & dep : scheduled_jobs[job].dependent_jobs)
|
||||
{
|
||||
chassert(scheduled_jobs.contains(dep)); // All depended jobs must be pending
|
||||
Info & dep_info = scheduled_jobs[dep];
|
||||
dep_info.dependencies_left--;
|
||||
if (!dep_info.is_blocked())
|
||||
enqueue(dep_info, dep, lock);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Notify waiters
|
||||
if (status == LoadStatus::FAILED)
|
||||
job->failed(exception_from_job);
|
||||
else if (status == LoadStatus::CANCELED)
|
||||
job->canceled(exception_from_job);
|
||||
|
||||
chassert(scheduled_jobs.contains(job)); // Job was pending
|
||||
Info & info = scheduled_jobs[job];
|
||||
if (info.is_ready())
|
||||
{
|
||||
ready_queue.erase(info.key());
|
||||
info.ready_seqno = 0;
|
||||
}
|
||||
|
||||
// Recurse into all dependent jobs
|
||||
LoadJobSet dependent;
|
||||
dependent.swap(info.dependent_jobs); // To avoid container modification during recursion
|
||||
for (const auto & dep : dependent)
|
||||
{
|
||||
if (!scheduled_jobs.contains(dep))
|
||||
continue; // Job has already been canceled
|
||||
std::exception_ptr e;
|
||||
NOEXCEPT_SCOPE({
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
e = std::make_exception_ptr(
|
||||
Exception(ErrorCodes::ASYNC_LOAD_CANCELED,
|
||||
"Load job '{}' -> {}",
|
||||
dep->name,
|
||||
getExceptionMessage(exception_from_job, /* with_stacktrace = */ false)));
|
||||
});
|
||||
finish(lock, dep, LoadStatus::CANCELED, e);
|
||||
}
|
||||
|
||||
// Clean dependency graph edges pointing to canceled jobs
|
||||
for (const auto & dep : job->dependencies)
|
||||
if (auto dep_info = scheduled_jobs.find(dep); dep_info != scheduled_jobs.end())
|
||||
dep_info->second.dependent_jobs.erase(job);
|
||||
}
|
||||
|
||||
// Job became finished
|
||||
scheduled_jobs.erase(job);
|
||||
NOEXCEPT_SCOPE({
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
finished_jobs.insert(job);
|
||||
if (log_progress)
|
||||
logAboutProgress(log, finished_jobs.size() - old_jobs, finished_jobs.size() + scheduled_jobs.size() - old_jobs, stopwatch);
|
||||
});
|
||||
}
|
||||
|
||||
void AsyncLoader::prioritize(const LoadJobPtr & job, ssize_t new_priority, std::unique_lock<std::mutex> & lock)
|
||||
{
|
||||
if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end())
|
||||
{
|
||||
if (info->second.priority >= new_priority)
|
||||
return; // Never lower priority
|
||||
|
||||
// Update priority and push job forward through ready queue if needed
|
||||
if (info->second.ready_seqno)
|
||||
ready_queue.erase(info->second.key());
|
||||
info->second.priority = new_priority;
|
||||
job->load_priority.store(new_priority); // Set user-facing priority (may affect executing jobs)
|
||||
if (info->second.ready_seqno)
|
||||
{
|
||||
NOEXCEPT_SCOPE({
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
ready_queue.emplace(info->second.key(), job);
|
||||
});
|
||||
}
|
||||
|
||||
// Recurse into dependencies
|
||||
for (const auto & dep : job->dependencies)
|
||||
prioritize(dep, new_priority, lock);
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncLoader::enqueue(Info & info, const LoadJobPtr & job, std::unique_lock<std::mutex> & lock)
|
||||
{
|
||||
chassert(!info.is_blocked());
|
||||
chassert(info.ready_seqno == 0);
|
||||
info.ready_seqno = ++last_ready_seqno;
|
||||
NOEXCEPT_SCOPE({
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
ready_queue.emplace(info.key(), job);
|
||||
});
|
||||
|
||||
job->enqueued();
|
||||
|
||||
if (is_running && workers < max_threads)
|
||||
spawn(lock);
|
||||
}
|
||||
|
||||
void AsyncLoader::spawn(std::unique_lock<std::mutex> &)
|
||||
{
|
||||
workers++;
|
||||
NOEXCEPT_SCOPE({
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
pool.scheduleOrThrowOnError([this] { worker(); });
|
||||
});
|
||||
}
|
||||
|
||||
void AsyncLoader::worker()
|
||||
{
|
||||
DENY_ALLOCATIONS_IN_SCOPE;
|
||||
|
||||
LoadJobPtr job;
|
||||
std::exception_ptr exception_from_job;
|
||||
while (true)
|
||||
{
|
||||
// This is inside the loop to also reset previous thread names set inside the jobs
|
||||
setThreadName("AsyncLoader");
|
||||
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
|
||||
// Handle just executed job
|
||||
if (exception_from_job)
|
||||
finish(lock, job, LoadStatus::FAILED, exception_from_job);
|
||||
else if (job)
|
||||
finish(lock, job, LoadStatus::OK);
|
||||
|
||||
if (!is_running || ready_queue.empty() || workers > max_threads)
|
||||
{
|
||||
workers--;
|
||||
return;
|
||||
}
|
||||
|
||||
// Take next job to be executed from the ready queue
|
||||
auto it = ready_queue.begin();
|
||||
job = it->second;
|
||||
ready_queue.erase(it);
|
||||
scheduled_jobs.find(job)->second.ready_seqno = 0; // This job is no longer in the ready queue
|
||||
}
|
||||
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
|
||||
try
|
||||
{
|
||||
job->execute(job);
|
||||
exception_from_job = {};
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
NOEXCEPT_SCOPE({
|
||||
if (log_failures)
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
exception_from_job = std::make_exception_ptr(
|
||||
Exception(ErrorCodes::ASYNC_LOAD_FAILED,
|
||||
"Load job '{}' failed: {}",
|
||||
job->name,
|
||||
getCurrentExceptionMessage(/* with_stacktrace = */ true)));
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
439
src/Common/AsyncLoader.h
Normal file
439
src/Common/AsyncLoader.h
Normal file
@ -0,0 +1,439 @@
|
||||
#pragma once
|
||||
|
||||
#include <condition_variable>
|
||||
#include <exception>
|
||||
#include <memory>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <vector>
|
||||
#include <unordered_set>
|
||||
#include <unordered_map>
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <base/types.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
|
||||
|
||||
namespace Poco { class Logger; }
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class LoadJob;
|
||||
using LoadJobPtr = std::shared_ptr<LoadJob>;
|
||||
using LoadJobSet = std::unordered_set<LoadJobPtr>;
|
||||
class LoadTask;
|
||||
using LoadTaskPtr = std::shared_ptr<LoadTask>;
|
||||
using LoadTaskPtrs = std::vector<LoadTaskPtr>;
|
||||
class AsyncLoader;
|
||||
|
||||
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch);
|
||||
|
||||
// Execution status of a load job.
|
||||
enum class LoadStatus
|
||||
{
|
||||
PENDING, // Load job is not started yet.
|
||||
OK, // Load job executed and was successful.
|
||||
FAILED, // Load job executed and failed.
|
||||
CANCELED // Load job is not going to be executed due to removal or dependency failure.
|
||||
};
|
||||
|
||||
// Smallest indivisible part of a loading process. Load job can have multiple dependencies, thus jobs constitute a direct acyclic graph (DAG).
|
||||
// Job encapsulates a function to be executed by `AsyncLoader` as soon as job functions of all dependencies are successfully executed.
|
||||
// Job can be waited for by an arbitrary number of threads. See `AsyncLoader` class description for more details.
|
||||
class LoadJob : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
template <class Func, class LoadJobSetType>
|
||||
LoadJob(LoadJobSetType && dependencies_, String name_, Func && func_, ssize_t priority_ = 0)
|
||||
: dependencies(std::forward<LoadJobSetType>(dependencies_))
|
||||
, name(std::move(name_))
|
||||
, func(std::forward<Func>(func_))
|
||||
, load_priority(priority_)
|
||||
{}
|
||||
|
||||
// Current job status.
|
||||
LoadStatus status() const;
|
||||
std::exception_ptr exception() const;
|
||||
|
||||
// Returns current value of a priority of the job. May differ from initial priority.
|
||||
ssize_t priority() const;
|
||||
|
||||
// Sync wait for a pending job to be finished: OK, FAILED or CANCELED status.
|
||||
// Throws if job is FAILED or CANCELED. Returns or throws immediately on non-pending job.
|
||||
void wait() const;
|
||||
|
||||
// Wait for a job to reach any non PENDING status.
|
||||
void waitNoThrow() const noexcept;
|
||||
|
||||
// Returns number of threads blocked by `wait()` or `waitNoThrow()` calls.
|
||||
size_t waitersCount() const;
|
||||
|
||||
// Introspection
|
||||
using TimePoint = std::chrono::system_clock::time_point;
|
||||
TimePoint scheduleTime() const { return schedule_time; }
|
||||
TimePoint enqueueTime() const { return enqueue_time; }
|
||||
TimePoint startTime() const { return start_time; }
|
||||
TimePoint finishTime() const { return finish_time; }
|
||||
|
||||
const LoadJobSet dependencies; // Jobs to be done before this one (with ownership), it is `const` to make creation of cycles hard
|
||||
const String name;
|
||||
|
||||
private:
|
||||
friend class AsyncLoader;
|
||||
|
||||
void ok();
|
||||
void failed(const std::exception_ptr & ptr);
|
||||
void canceled(const std::exception_ptr & ptr);
|
||||
void finish();
|
||||
|
||||
void scheduled();
|
||||
void enqueued();
|
||||
void execute(const LoadJobPtr & self);
|
||||
|
||||
std::function<void(const LoadJobPtr & self)> func;
|
||||
std::atomic<ssize_t> load_priority;
|
||||
|
||||
mutable std::mutex mutex;
|
||||
mutable std::condition_variable finished;
|
||||
mutable size_t waiters = 0;
|
||||
LoadStatus load_status{LoadStatus::PENDING};
|
||||
std::exception_ptr load_exception;
|
||||
|
||||
std::atomic<TimePoint> schedule_time{TimePoint{}};
|
||||
std::atomic<TimePoint> enqueue_time{TimePoint{}};
|
||||
std::atomic<TimePoint> start_time{TimePoint{}};
|
||||
std::atomic<TimePoint> finish_time{TimePoint{}};
|
||||
};
|
||||
|
||||
struct EmptyJobFunc
|
||||
{
|
||||
void operator()(const LoadJobPtr &) {}
|
||||
};
|
||||
|
||||
template <class Func = EmptyJobFunc>
|
||||
LoadJobPtr makeLoadJob(LoadJobSet && dependencies, String name, Func && func = EmptyJobFunc())
|
||||
{
|
||||
return std::make_shared<LoadJob>(std::move(dependencies), std::move(name), std::forward<Func>(func));
|
||||
}
|
||||
|
||||
template <class Func = EmptyJobFunc>
|
||||
LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, String name, Func && func = EmptyJobFunc())
|
||||
{
|
||||
return std::make_shared<LoadJob>(dependencies, std::move(name), std::forward<Func>(func));
|
||||
}
|
||||
|
||||
template <class Func = EmptyJobFunc>
|
||||
LoadJobPtr makeLoadJob(LoadJobSet && dependencies, ssize_t priority, String name, Func && func = EmptyJobFunc())
|
||||
{
|
||||
return std::make_shared<LoadJob>(std::move(dependencies), std::move(name), std::forward<Func>(func), priority);
|
||||
}
|
||||
|
||||
template <class Func = EmptyJobFunc>
|
||||
LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, ssize_t priority, String name, Func && func = EmptyJobFunc())
|
||||
{
|
||||
return std::make_shared<LoadJob>(dependencies, std::move(name), std::forward<Func>(func), priority);
|
||||
}
|
||||
|
||||
// Represents a logically connected set of LoadJobs required to achieve some goals (final LoadJob in the set).
|
||||
class LoadTask : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
LoadTask(AsyncLoader & loader_, LoadJobSet && jobs_, LoadJobSet && goal_jobs_ = {});
|
||||
~LoadTask();
|
||||
|
||||
// Merge all jobs from other task into this task.
|
||||
void merge(const LoadTaskPtr & task);
|
||||
|
||||
// Schedule all jobs with AsyncLoader.
|
||||
void schedule();
|
||||
|
||||
// Remove all jobs of this task from AsyncLoader.
|
||||
void remove();
|
||||
|
||||
// Do not track jobs in this task.
|
||||
// WARNING: Jobs will never be removed() and are going to be stored as finished jobs until ~AsyncLoader().
|
||||
void detach();
|
||||
|
||||
// Return the final jobs in this tasks. This job subset should be used as `dependencies` for dependent jobs or tasks:
|
||||
// auto load_task = loadSomethingAsync(async_loader, load_after_task.goals(), something);
|
||||
const LoadJobSet & goals() const { return goal_jobs.empty() ? jobs : goal_jobs; }
|
||||
|
||||
private:
|
||||
friend class AsyncLoader;
|
||||
|
||||
AsyncLoader & loader;
|
||||
LoadJobSet jobs;
|
||||
LoadJobSet goal_jobs;
|
||||
};
|
||||
|
||||
inline LoadTaskPtr makeLoadTask(AsyncLoader & loader, LoadJobSet && jobs, LoadJobSet && goals = {})
|
||||
{
|
||||
return std::make_shared<LoadTask>(loader, std::move(jobs), std::move(goals));
|
||||
}
|
||||
|
||||
inline void scheduleLoad(const LoadTaskPtr & task)
|
||||
{
|
||||
task->schedule();
|
||||
}
|
||||
|
||||
inline void scheduleLoad(const LoadTaskPtrs & tasks)
|
||||
{
|
||||
for (const auto & task : tasks)
|
||||
task->schedule();
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
inline void scheduleLoad(Args && ... args)
|
||||
{
|
||||
(scheduleLoad(std::forward<Args>(args)), ...);
|
||||
}
|
||||
|
||||
inline void waitLoad(const LoadJobSet & jobs)
|
||||
{
|
||||
for (const auto & job : jobs)
|
||||
job->wait();
|
||||
}
|
||||
|
||||
inline void waitLoad(const LoadTaskPtr & task)
|
||||
{
|
||||
waitLoad(task->goals());
|
||||
}
|
||||
|
||||
inline void waitLoad(const LoadTaskPtrs & tasks)
|
||||
{
|
||||
for (const auto & task : tasks)
|
||||
waitLoad(task->goals());
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
inline void waitLoad(Args && ... args)
|
||||
{
|
||||
(waitLoad(std::forward<Args>(args)), ...);
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
inline void scheduleAndWaitLoad(Args && ... args)
|
||||
{
|
||||
scheduleLoad(std::forward<Args>(args)...);
|
||||
waitLoad(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
inline LoadJobSet getGoals(const LoadTaskPtrs & tasks)
|
||||
{
|
||||
LoadJobSet result;
|
||||
for (const auto & task : tasks)
|
||||
result.insert(task->goals().begin(), task->goals().end());
|
||||
return result;
|
||||
}
|
||||
|
||||
inline LoadJobSet joinJobs(const LoadJobSet & jobs1, const LoadJobSet & jobs2)
|
||||
{
|
||||
LoadJobSet result;
|
||||
if (!jobs1.empty())
|
||||
result.insert(jobs1.begin(), jobs1.end());
|
||||
if (!jobs2.empty())
|
||||
result.insert(jobs2.begin(), jobs2.end());
|
||||
return result;
|
||||
}
|
||||
|
||||
inline LoadTaskPtrs joinTasks(const LoadTaskPtrs & tasks1, const LoadTaskPtrs & tasks2)
|
||||
{
|
||||
if (tasks1.empty())
|
||||
return tasks2;
|
||||
if (tasks2.empty())
|
||||
return tasks1;
|
||||
LoadTaskPtrs result;
|
||||
result.reserve(tasks1.size() + tasks2.size());
|
||||
result.insert(result.end(), tasks1.begin(), tasks1.end());
|
||||
result.insert(result.end(), tasks2.begin(), tasks2.end());
|
||||
return result;
|
||||
}
|
||||
|
||||
// `AsyncLoader` is a scheduler for DAG of `LoadJob`s. It tracks dependencies and priorities of jobs.
|
||||
// Basic usage example:
|
||||
// auto job_func = [&] (const LoadJobPtr & self) {
|
||||
// LOG_TRACE(log, "Executing load job '{}' with priority '{}'", self->name, self->priority());
|
||||
// };
|
||||
// auto job1 = makeLoadJob({}, "job1", job_func);
|
||||
// auto job2 = makeLoadJob({ job1 }, "job2", job_func);
|
||||
// auto job3 = makeLoadJob({ job1 }, "job3", job_func);
|
||||
// auto task = makeLoadTask(async_loader, { job1, job2, job3 });
|
||||
// task.schedule();
|
||||
// Here we have created and scheduled a task consisting of three jobs. Job1 has no dependencies and is run first.
|
||||
// Job2 and job3 depend on job1 and are run only after job1 completion. Another thread may prioritize a job and wait for it:
|
||||
// async_loader->prioritize(job3, /* priority = */ 1); // higher priority jobs are run first, default priority is zero.
|
||||
// job3->wait(); // blocks until job completion or cancellation and rethrow an exception (if any)
|
||||
//
|
||||
// AsyncLoader tracks state of all scheduled jobs. Job lifecycle is the following:
|
||||
// 1) Job is constructed with PENDING status and initial priority. The job is placed into a task.
|
||||
// 2) The task is scheduled with all its jobs and their dependencies. A scheduled job may be ready (i.e. have all its dependencies finished) or blocked.
|
||||
// 3a) When all dependencies are successfully executed, the job became ready. A ready job is enqueued into the ready queue.
|
||||
// 3b) If at least one of the job dependencies is failed or canceled, then this job is canceled (with all it's dependent jobs as well).
|
||||
// On cancellation an ASYNC_LOAD_CANCELED exception is generated and saved inside LoadJob object. The job status is changed to CANCELED.
|
||||
// Exception is rethrown by any existing or new `wait()` call. The job is moved to the set of the finished jobs.
|
||||
// 4) The scheduled pending ready job starts execution by a worker. The job is dequeued. Callback `job_func` is called.
|
||||
// Status of an executing job is PENDING. And it is still considered as a scheduled job by AsyncLoader.
|
||||
// Note that `job_func` of a CANCELED job is never executed.
|
||||
// 5a) On successful execution the job status is changed to OK and all existing and new `wait()` calls finish w/o exceptions.
|
||||
// 5b) Any exception thrown out of `job_func` is wrapped into an ASYNC_LOAD_FAILED exception and saved inside LoadJob.
|
||||
// The job status is changed to FAILED. All the dependent jobs are canceled. The exception is rethrown from all existing and new `wait()` calls.
|
||||
// 6) The job is no longer considered as scheduled and is instead moved to the finished jobs set. This is just for introspection of the finished jobs.
|
||||
// 7) The task containing this job is destructed or `remove()` is explicitly called. The job is removed from the finished job set.
|
||||
// 8) The job is destructed.
|
||||
//
|
||||
// Every job has a priority associated with it. AsyncLoader runs higher priority (greater `priority` value) jobs first. Job priority can be elevated
|
||||
// (a) if either it has a dependent job with higher priority (in this case priority of a dependent job is inherited);
|
||||
// (b) or job was explicitly prioritized by `prioritize(job, higher_priority)` call (this also leads to a priority inheritance for all the dependencies).
|
||||
// Note that to avoid priority inversion `job_func` should use `self->priority()` to schedule new jobs in AsyncLoader or any other pool.
|
||||
// Value stored in load job priority field is atomic and can be increased even during job execution.
|
||||
//
|
||||
// When a task is scheduled it can contain dependencies on previously scheduled jobs. These jobs can have any status. If job A being scheduled depends on
|
||||
// another job B that is not yet scheduled, then job B will also be scheduled (even if the task does not contain it).
|
||||
class AsyncLoader : private boost::noncopyable
|
||||
{
|
||||
private:
|
||||
// Key of a pending job in the ready queue.
|
||||
struct ReadyKey
|
||||
{
|
||||
ssize_t priority; // Ascending order
|
||||
ssize_t initial_priority; // Ascending order
|
||||
UInt64 ready_seqno; // Descending order
|
||||
|
||||
bool operator<(const ReadyKey & rhs) const
|
||||
{
|
||||
if (priority > rhs.priority)
|
||||
return true;
|
||||
if (priority < rhs.priority)
|
||||
return false;
|
||||
if (initial_priority > rhs.initial_priority)
|
||||
return true;
|
||||
if (initial_priority < rhs.initial_priority)
|
||||
return false;
|
||||
return ready_seqno < rhs.ready_seqno;
|
||||
}
|
||||
};
|
||||
|
||||
// Scheduling information for a pending job.
|
||||
struct Info
|
||||
{
|
||||
ssize_t initial_priority = 0; // Initial priority passed into schedule().
|
||||
ssize_t priority = 0; // Elevated priority, due to priority inheritance or prioritize().
|
||||
size_t dependencies_left = 0; // Current number of dependencies on pending jobs.
|
||||
UInt64 ready_seqno = 0; // Zero means that job is not in ready queue.
|
||||
LoadJobSet dependent_jobs; // Set of jobs dependent on this job.
|
||||
|
||||
// Three independent states of a non-finished job.
|
||||
bool is_blocked() const { return dependencies_left > 0; }
|
||||
bool is_ready() const { return dependencies_left == 0 && ready_seqno > 0; }
|
||||
bool is_executing() const { return dependencies_left == 0 && ready_seqno == 0; }
|
||||
|
||||
// Get key of a ready job
|
||||
ReadyKey key() const
|
||||
{
|
||||
return {.priority = priority, .initial_priority = initial_priority, .ready_seqno = ready_seqno};
|
||||
}
|
||||
};
|
||||
|
||||
public:
|
||||
using Metric = CurrentMetrics::Metric;
|
||||
|
||||
AsyncLoader(Metric metric_threads, Metric metric_active_threads, size_t max_threads_, bool log_failures_, bool log_progress_);
|
||||
|
||||
// WARNING: all tasks instances should be destructed before associated AsyncLoader.
|
||||
~AsyncLoader();
|
||||
|
||||
// Start workers to execute scheduled load jobs.
|
||||
void start();
|
||||
|
||||
// Wait for all load jobs to finish, including all new jobs. So at first take care to stop adding new jobs.
|
||||
void wait();
|
||||
|
||||
// Wait for currently executing jobs to finish, but do not run any other pending jobs.
|
||||
// Not finished jobs are left in pending state:
|
||||
// - they can be executed by calling start() again;
|
||||
// - or canceled using ~Task() or remove() later.
|
||||
void stop();
|
||||
|
||||
// Schedule all jobs of given `task` and their dependencies (if any, not scheduled yet).
|
||||
// Higher priority jobs (with greater `job->priority()` value) are executed earlier.
|
||||
// All dependencies of a scheduled job inherit its priority if it is higher. This way higher priority job
|
||||
// never wait for (blocked by) lower priority jobs. No priority inversion is possible.
|
||||
// Note that `task` destructor ensures that all its jobs are finished (OK, FAILED or CANCELED)
|
||||
// and are removed from AsyncLoader, so it is thread-safe to destroy them.
|
||||
void schedule(LoadTask & task);
|
||||
void schedule(const LoadTaskPtr & task);
|
||||
|
||||
// Schedule all tasks atomically. To ensure only highest priority jobs among all tasks are run first.
|
||||
void schedule(const std::vector<LoadTaskPtr> & tasks);
|
||||
|
||||
// Increase priority of a job and all its dependencies recursively.
|
||||
void prioritize(const LoadJobPtr & job, ssize_t new_priority);
|
||||
|
||||
// Remove finished jobs, cancel scheduled jobs, wait for executing jobs to finish and remove them.
|
||||
void remove(const LoadJobSet & jobs);
|
||||
|
||||
// Increase or decrease maximum number of simultaneously executing jobs.
|
||||
void setMaxThreads(size_t value);
|
||||
|
||||
size_t getMaxThreads() const;
|
||||
size_t getScheduledJobCount() const;
|
||||
|
||||
// Helper class for introspection
|
||||
struct JobState
|
||||
{
|
||||
LoadJobPtr job;
|
||||
size_t dependencies_left = 0;
|
||||
bool is_executing = false;
|
||||
bool is_blocked = false;
|
||||
bool is_ready = false;
|
||||
std::optional<ssize_t> initial_priority;
|
||||
std::optional<UInt64> ready_seqno;
|
||||
};
|
||||
|
||||
// For introspection and debug only, see `system.async_loader` table
|
||||
std::vector<JobState> getJobStates() const;
|
||||
|
||||
private:
|
||||
void checkCycle(const LoadJobSet & jobs, std::unique_lock<std::mutex> & lock);
|
||||
String checkCycleImpl(const LoadJobPtr & job, LoadJobSet & left, LoadJobSet & visited, std::unique_lock<std::mutex> & lock);
|
||||
void finish(std::unique_lock<std::mutex> & lock, const LoadJobPtr & job, LoadStatus status, std::exception_ptr exception_from_job = {});
|
||||
void scheduleImpl(const LoadJobSet & input_jobs);
|
||||
void gatherNotScheduled(const LoadJobPtr & job, LoadJobSet & jobs, std::unique_lock<std::mutex> & lock);
|
||||
void prioritize(const LoadJobPtr & job, ssize_t new_priority, std::unique_lock<std::mutex> & lock);
|
||||
void enqueue(Info & info, const LoadJobPtr & job, std::unique_lock<std::mutex> & lock);
|
||||
void spawn(std::unique_lock<std::mutex> &);
|
||||
void worker();
|
||||
|
||||
// Logging
|
||||
const bool log_failures; // Worker should log all exceptions caught from job functions.
|
||||
const bool log_progress; // Periodically log total progress
|
||||
Poco::Logger * log;
|
||||
std::chrono::system_clock::time_point busy_period_start_time;
|
||||
AtomicStopwatch stopwatch;
|
||||
size_t old_jobs = 0; // Number of jobs that were finished in previous busy period (for correct progress indication)
|
||||
|
||||
mutable std::mutex mutex; // Guards all the fields below.
|
||||
bool is_running = false;
|
||||
|
||||
// Full set of scheduled pending jobs along with scheduling info.
|
||||
std::unordered_map<LoadJobPtr, Info> scheduled_jobs;
|
||||
|
||||
// Subset of scheduled pending non-blocked jobs (waiting for a worker to be executed).
|
||||
// Represent a queue of jobs in order of decreasing priority and FIFO for jobs with equal priorities.
|
||||
std::map<ReadyKey, LoadJobPtr> ready_queue;
|
||||
|
||||
// Set of finished jobs (for introspection only, until jobs are removed).
|
||||
LoadJobSet finished_jobs;
|
||||
|
||||
// Increasing counter for `ReadyKey` assignment (to preserve FIFO order of the jobs with equal priorities).
|
||||
UInt64 last_ready_seqno = 0;
|
||||
|
||||
// For executing jobs. Note that we avoid using an internal queue of the pool to be able to prioritize jobs.
|
||||
size_t max_threads;
|
||||
size_t workers = 0;
|
||||
ThreadPool pool;
|
||||
};
|
||||
|
||||
}
|
@ -576,6 +576,9 @@
|
||||
M(691, UNKNOWN_ELEMENT_OF_ENUM) \
|
||||
M(692, TOO_MANY_MUTATIONS) \
|
||||
M(693, AWS_ERROR) \
|
||||
M(694, ASYNC_LOAD_CYCLE) \
|
||||
M(695, ASYNC_LOAD_FAILED) \
|
||||
M(696, ASYNC_LOAD_CANCELED) \
|
||||
\
|
||||
M(999, KEEPER_EXCEPTION) \
|
||||
M(1000, POCO_EXCEPTION) \
|
||||
|
749
src/Common/tests/gtest_async_loader.cpp
Normal file
749
src/Common/tests/gtest_async_loader.cpp
Normal file
@ -0,0 +1,749 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <barrier>
|
||||
#include <chrono>
|
||||
#include <mutex>
|
||||
#include <stdexcept>
|
||||
#include <string_view>
|
||||
#include <vector>
|
||||
#include <thread>
|
||||
#include <pcg_random.hpp>
|
||||
|
||||
#include <base/types.h>
|
||||
#include <base/sleep.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/AsyncLoader.h>
|
||||
#include <Common/randomSeed.h>
|
||||
|
||||
using namespace DB;
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric TablesLoaderThreads;
|
||||
extern const Metric TablesLoaderThreadsActive;
|
||||
}
|
||||
|
||||
namespace DB::ErrorCodes
|
||||
{
|
||||
extern const int ASYNC_LOAD_CYCLE;
|
||||
extern const int ASYNC_LOAD_FAILED;
|
||||
extern const int ASYNC_LOAD_CANCELED;
|
||||
}
|
||||
|
||||
struct AsyncLoaderTest
|
||||
{
|
||||
AsyncLoader loader;
|
||||
|
||||
std::mutex rng_mutex;
|
||||
pcg64 rng{randomSeed()};
|
||||
|
||||
explicit AsyncLoaderTest(size_t max_threads = 1)
|
||||
: loader(CurrentMetrics::TablesLoaderThreads, CurrentMetrics::TablesLoaderThreadsActive, max_threads, /* log_failures = */ false, /* log_progress = */ false)
|
||||
{}
|
||||
|
||||
template <typename T>
|
||||
T randomInt(T from, T to)
|
||||
{
|
||||
std::uniform_int_distribution<T> distribution(from, to);
|
||||
std::scoped_lock lock(rng_mutex);
|
||||
return distribution(rng);
|
||||
}
|
||||
|
||||
void randomSleepUs(UInt64 min_us, UInt64 max_us, int probability_percent)
|
||||
{
|
||||
if (randomInt(0, 99) < probability_percent)
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(randomInt(min_us, max_us)));
|
||||
}
|
||||
|
||||
template <typename JobFunc>
|
||||
LoadJobSet randomJobSet(int job_count, int dep_probability_percent, JobFunc job_func, std::string_view name_prefix = "job")
|
||||
{
|
||||
std::vector<LoadJobPtr> jobs;
|
||||
jobs.reserve(job_count);
|
||||
for (int j = 0; j < job_count; j++)
|
||||
{
|
||||
LoadJobSet deps;
|
||||
for (int d = 0; d < j; d++)
|
||||
{
|
||||
if (randomInt(0, 99) < dep_probability_percent)
|
||||
deps.insert(jobs[d]);
|
||||
}
|
||||
jobs.push_back(makeLoadJob(std::move(deps), fmt::format("{}{}", name_prefix, j), job_func));
|
||||
}
|
||||
return {jobs.begin(), jobs.end()};
|
||||
}
|
||||
|
||||
template <typename JobFunc>
|
||||
LoadJobSet randomJobSet(int job_count, int dep_probability_percent, const std::vector<LoadJobPtr> & external_deps, JobFunc job_func, std::string_view name_prefix = "job")
|
||||
{
|
||||
std::vector<LoadJobPtr> jobs;
|
||||
jobs.reserve(job_count);
|
||||
for (int j = 0; j < job_count; j++)
|
||||
{
|
||||
LoadJobSet deps;
|
||||
for (int d = 0; d < j; d++)
|
||||
{
|
||||
if (randomInt(0, 99) < dep_probability_percent)
|
||||
deps.insert(jobs[d]);
|
||||
}
|
||||
if (!external_deps.empty() && randomInt(0, 99) < dep_probability_percent)
|
||||
deps.insert(external_deps[randomInt<size_t>(0, external_deps.size() - 1)]);
|
||||
jobs.push_back(makeLoadJob(std::move(deps), fmt::format("{}{}", name_prefix, j), job_func));
|
||||
}
|
||||
return {jobs.begin(), jobs.end()};
|
||||
}
|
||||
|
||||
template <typename JobFunc>
|
||||
LoadJobSet chainJobSet(int job_count, JobFunc job_func, std::string_view name_prefix = "job")
|
||||
{
|
||||
std::vector<LoadJobPtr> jobs;
|
||||
jobs.reserve(job_count);
|
||||
jobs.push_back(makeLoadJob({}, fmt::format("{}{}", name_prefix, 0), job_func));
|
||||
for (int j = 1; j < job_count; j++)
|
||||
jobs.push_back(makeLoadJob({ jobs[j - 1] }, fmt::format("{}{}", name_prefix, j), job_func));
|
||||
return {jobs.begin(), jobs.end()};
|
||||
}
|
||||
|
||||
LoadTaskPtr schedule(LoadJobSet && jobs)
|
||||
{
|
||||
LoadTaskPtr task = makeLoadTask(loader, std::move(jobs));
|
||||
task->schedule();
|
||||
return task;
|
||||
}
|
||||
};
|
||||
|
||||
TEST(AsyncLoader, Smoke)
|
||||
{
|
||||
AsyncLoaderTest t(2);
|
||||
|
||||
static constexpr ssize_t low_priority = -1;
|
||||
|
||||
std::atomic<size_t> jobs_done{0};
|
||||
std::atomic<size_t> low_priority_jobs_done{0};
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr & self) {
|
||||
jobs_done++;
|
||||
if (self->priority() == low_priority)
|
||||
low_priority_jobs_done++;
|
||||
};
|
||||
|
||||
{
|
||||
auto job1 = makeLoadJob({}, "job1", job_func);
|
||||
auto job2 = makeLoadJob({ job1 }, "job2", job_func);
|
||||
auto task1 = t.schedule({ job1, job2 });
|
||||
|
||||
auto job3 = makeLoadJob({ job2 }, "job3", job_func);
|
||||
auto job4 = makeLoadJob({ job2 }, "job4", job_func);
|
||||
auto task2 = t.schedule({ job3, job4 });
|
||||
auto job5 = makeLoadJob({ job3, job4 }, low_priority, "job5", job_func);
|
||||
task2->merge(t.schedule({ job5 }));
|
||||
|
||||
std::thread waiter_thread([=] { job5->wait(); });
|
||||
|
||||
t.loader.start();
|
||||
|
||||
job3->wait();
|
||||
t.loader.wait();
|
||||
job4->wait();
|
||||
|
||||
waiter_thread.join();
|
||||
|
||||
ASSERT_EQ(job1->status(), LoadStatus::OK);
|
||||
ASSERT_EQ(job2->status(), LoadStatus::OK);
|
||||
}
|
||||
|
||||
ASSERT_EQ(jobs_done, 5);
|
||||
ASSERT_EQ(low_priority_jobs_done, 1);
|
||||
|
||||
t.loader.stop();
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, CycleDetection)
|
||||
{
|
||||
AsyncLoaderTest t;
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr &) {};
|
||||
|
||||
LoadJobPtr cycle_breaker; // To avoid memleak we introduce with a cycle
|
||||
|
||||
try
|
||||
{
|
||||
std::vector<LoadJobPtr> jobs;
|
||||
jobs.reserve(16);
|
||||
jobs.push_back(makeLoadJob({}, "job0", job_func));
|
||||
jobs.push_back(makeLoadJob({ jobs[0] }, "job1", job_func));
|
||||
jobs.push_back(makeLoadJob({ jobs[0], jobs[1] }, "job2", job_func));
|
||||
jobs.push_back(makeLoadJob({ jobs[0], jobs[2] }, "job3", job_func));
|
||||
|
||||
// Actually it is hard to construct a cycle, but suppose someone was able to succeed violating constness
|
||||
const_cast<LoadJobSet &>(jobs[1]->dependencies).insert(jobs[3]);
|
||||
cycle_breaker = jobs[1];
|
||||
|
||||
// Add couple unrelated jobs
|
||||
jobs.push_back(makeLoadJob({ jobs[1] }, "job4", job_func));
|
||||
jobs.push_back(makeLoadJob({ jobs[4] }, "job5", job_func));
|
||||
jobs.push_back(makeLoadJob({ jobs[3] }, "job6", job_func));
|
||||
jobs.push_back(makeLoadJob({ jobs[1], jobs[2], jobs[3], jobs[4], jobs[5], jobs[6] }, "job7", job_func));
|
||||
|
||||
// Also add another not connected jobs
|
||||
jobs.push_back(makeLoadJob({}, "job8", job_func));
|
||||
jobs.push_back(makeLoadJob({}, "job9", job_func));
|
||||
jobs.push_back(makeLoadJob({ jobs[9] }, "job10", job_func));
|
||||
|
||||
auto task1 = t.schedule({ jobs.begin(), jobs.end()});
|
||||
FAIL();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
int present[] = { 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0 };
|
||||
for (int i = 0; i < std::size(present); i++)
|
||||
ASSERT_EQ(e.message().find(fmt::format("job{}", i)) != String::npos, present[i]);
|
||||
}
|
||||
|
||||
const_cast<LoadJobSet &>(cycle_breaker->dependencies).clear();
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, CancelPendingJob)
|
||||
{
|
||||
AsyncLoaderTest t;
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr &) {};
|
||||
|
||||
auto job = makeLoadJob({}, "job", job_func);
|
||||
auto task = t.schedule({ job });
|
||||
|
||||
task->remove(); // this cancels pending the job (async loader was not started to execute it)
|
||||
|
||||
ASSERT_EQ(job->status(), LoadStatus::CANCELED);
|
||||
try
|
||||
{
|
||||
job->wait();
|
||||
FAIL();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, CancelPendingTask)
|
||||
{
|
||||
AsyncLoaderTest t;
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr &) {};
|
||||
|
||||
auto job1 = makeLoadJob({}, "job1", job_func);
|
||||
auto job2 = makeLoadJob({ job1 }, "job2", job_func);
|
||||
auto task = t.schedule({ job1, job2 });
|
||||
|
||||
task->remove(); // this cancels both jobs (async loader was not started to execute it)
|
||||
|
||||
ASSERT_EQ(job1->status(), LoadStatus::CANCELED);
|
||||
ASSERT_EQ(job2->status(), LoadStatus::CANCELED);
|
||||
|
||||
try
|
||||
{
|
||||
job1->wait();
|
||||
FAIL();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
job2->wait();
|
||||
FAIL();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, CancelPendingDependency)
|
||||
{
|
||||
AsyncLoaderTest t;
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr &) {};
|
||||
|
||||
auto job1 = makeLoadJob({}, "job1", job_func);
|
||||
auto job2 = makeLoadJob({ job1 }, "job2", job_func);
|
||||
auto task1 = t.schedule({ job1 });
|
||||
auto task2 = t.schedule({ job2 });
|
||||
|
||||
task1->remove(); // this cancels both jobs, due to dependency (async loader was not started to execute it)
|
||||
|
||||
ASSERT_EQ(job1->status(), LoadStatus::CANCELED);
|
||||
ASSERT_EQ(job2->status(), LoadStatus::CANCELED);
|
||||
|
||||
try
|
||||
{
|
||||
job1->wait();
|
||||
FAIL();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
job2->wait();
|
||||
FAIL();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, CancelExecutingJob)
|
||||
{
|
||||
AsyncLoaderTest t;
|
||||
t.loader.start();
|
||||
|
||||
std::barrier sync(2);
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr &)
|
||||
{
|
||||
sync.arrive_and_wait(); // (A) sync with main thread
|
||||
sync.arrive_and_wait(); // (B) wait for waiter
|
||||
// signals (C)
|
||||
};
|
||||
|
||||
auto job = makeLoadJob({}, "job", job_func);
|
||||
auto task = t.schedule({ job });
|
||||
|
||||
sync.arrive_and_wait(); // (A) wait for job to start executing
|
||||
std::thread canceler([&]
|
||||
{
|
||||
task->remove(); // waits for (C)
|
||||
});
|
||||
while (job->waitersCount() == 0)
|
||||
std::this_thread::yield();
|
||||
ASSERT_EQ(job->status(), LoadStatus::PENDING);
|
||||
sync.arrive_and_wait(); // (B) sync with job
|
||||
canceler.join();
|
||||
|
||||
ASSERT_EQ(job->status(), LoadStatus::OK);
|
||||
job->wait();
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, CancelExecutingTask)
|
||||
{
|
||||
AsyncLoaderTest t(16);
|
||||
t.loader.start();
|
||||
std::barrier sync(2);
|
||||
|
||||
auto blocker_job_func = [&] (const LoadJobPtr &)
|
||||
{
|
||||
sync.arrive_and_wait(); // (A) sync with main thread
|
||||
sync.arrive_and_wait(); // (B) wait for waiter
|
||||
// signals (C)
|
||||
};
|
||||
|
||||
auto job_to_cancel_func = [&] (const LoadJobPtr &)
|
||||
{
|
||||
FAIL(); // this job should be canceled
|
||||
};
|
||||
|
||||
auto job_to_succeed_func = [&] (const LoadJobPtr &)
|
||||
{
|
||||
};
|
||||
|
||||
// Make several iterations to catch the race (if any)
|
||||
for (int iteration = 0; iteration < 10; iteration++) {
|
||||
std::vector<LoadJobPtr> task1_jobs;
|
||||
task1_jobs.reserve(256);
|
||||
auto blocker_job = makeLoadJob({}, "blocker_job", blocker_job_func);
|
||||
task1_jobs.push_back(blocker_job);
|
||||
for (int i = 0; i < 100; i++)
|
||||
task1_jobs.push_back(makeLoadJob({ blocker_job }, "job_to_cancel", job_to_cancel_func));
|
||||
auto task1 = t.schedule({ task1_jobs.begin(), task1_jobs.end() });
|
||||
auto job_to_succeed = makeLoadJob({ blocker_job }, "job_to_succeed", job_to_succeed_func);
|
||||
auto task2 = t.schedule({ job_to_succeed });
|
||||
|
||||
sync.arrive_and_wait(); // (A) wait for job to start executing
|
||||
std::thread canceler([&]
|
||||
{
|
||||
task1->remove(); // waits for (C)
|
||||
});
|
||||
while (blocker_job->waitersCount() == 0)
|
||||
std::this_thread::yield();
|
||||
ASSERT_EQ(blocker_job->status(), LoadStatus::PENDING);
|
||||
sync.arrive_and_wait(); // (B) sync with job
|
||||
canceler.join();
|
||||
t.loader.wait();
|
||||
|
||||
ASSERT_EQ(blocker_job->status(), LoadStatus::OK);
|
||||
ASSERT_EQ(job_to_succeed->status(), LoadStatus::OK);
|
||||
for (const auto & job : task1_jobs)
|
||||
{
|
||||
if (job != blocker_job)
|
||||
ASSERT_EQ(job->status(), LoadStatus::CANCELED);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, DISABLED_JobFailure)
|
||||
{
|
||||
AsyncLoaderTest t;
|
||||
t.loader.start();
|
||||
|
||||
std::string error_message = "test job failure";
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr &) {
|
||||
throw std::runtime_error(error_message);
|
||||
};
|
||||
|
||||
auto job = makeLoadJob({}, "job", job_func);
|
||||
auto task = t.schedule({ job });
|
||||
|
||||
t.loader.wait();
|
||||
|
||||
ASSERT_EQ(job->status(), LoadStatus::FAILED);
|
||||
try
|
||||
{
|
||||
job->wait();
|
||||
FAIL();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_FAILED);
|
||||
ASSERT_TRUE(e.message().find(error_message) != String::npos);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, ScheduleJobWithFailedDependencies)
|
||||
{
|
||||
AsyncLoaderTest t;
|
||||
t.loader.start();
|
||||
|
||||
std::string_view error_message = "test job failure";
|
||||
|
||||
auto failed_job_func = [&] (const LoadJobPtr &) {
|
||||
throw Exception(ErrorCodes::ASYNC_LOAD_FAILED, "{}", error_message);
|
||||
};
|
||||
|
||||
auto failed_job = makeLoadJob({}, "failed_job", failed_job_func);
|
||||
auto failed_task = t.schedule({ failed_job });
|
||||
|
||||
t.loader.wait();
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr &) {};
|
||||
|
||||
auto job1 = makeLoadJob({ failed_job }, "job1", job_func);
|
||||
auto job2 = makeLoadJob({ job1 }, "job2", job_func);
|
||||
auto task = t.schedule({ job1, job2 });
|
||||
|
||||
t.loader.wait();
|
||||
|
||||
ASSERT_EQ(job1->status(), LoadStatus::CANCELED);
|
||||
ASSERT_EQ(job2->status(), LoadStatus::CANCELED);
|
||||
try
|
||||
{
|
||||
job1->wait();
|
||||
FAIL();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
|
||||
ASSERT_TRUE(e.message().find(error_message) != String::npos);
|
||||
}
|
||||
try
|
||||
{
|
||||
job2->wait();
|
||||
FAIL();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
|
||||
ASSERT_TRUE(e.message().find(error_message) != String::npos);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, ScheduleJobWithCanceledDependencies)
|
||||
{
|
||||
AsyncLoaderTest t;
|
||||
|
||||
auto canceled_job_func = [&] (const LoadJobPtr &) {};
|
||||
auto canceled_job = makeLoadJob({}, "canceled_job", canceled_job_func);
|
||||
auto canceled_task = t.schedule({ canceled_job });
|
||||
canceled_task->remove();
|
||||
|
||||
t.loader.start();
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr &) {};
|
||||
auto job1 = makeLoadJob({ canceled_job }, "job1", job_func);
|
||||
auto job2 = makeLoadJob({ job1 }, "job2", job_func);
|
||||
auto task = t.schedule({ job1, job2 });
|
||||
|
||||
t.loader.wait();
|
||||
|
||||
ASSERT_EQ(job1->status(), LoadStatus::CANCELED);
|
||||
ASSERT_EQ(job2->status(), LoadStatus::CANCELED);
|
||||
try
|
||||
{
|
||||
job1->wait();
|
||||
FAIL();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
|
||||
}
|
||||
try
|
||||
{
|
||||
job2->wait();
|
||||
FAIL();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, TestConcurrency)
|
||||
{
|
||||
AsyncLoaderTest t(10);
|
||||
t.loader.start();
|
||||
|
||||
for (int concurrency = 1; concurrency <= 10; concurrency++)
|
||||
{
|
||||
std::barrier sync(concurrency);
|
||||
|
||||
std::atomic<int> executing{0};
|
||||
auto job_func = [&] (const LoadJobPtr &)
|
||||
{
|
||||
executing++;
|
||||
ASSERT_LE(executing, concurrency);
|
||||
sync.arrive_and_wait();
|
||||
executing--;
|
||||
};
|
||||
|
||||
std::vector<LoadTaskPtr> tasks;
|
||||
tasks.reserve(concurrency);
|
||||
for (int i = 0; i < concurrency; i++)
|
||||
tasks.push_back(t.schedule(t.chainJobSet(5, job_func)));
|
||||
t.loader.wait();
|
||||
ASSERT_EQ(executing, 0);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, TestOverload)
|
||||
{
|
||||
AsyncLoaderTest t(3);
|
||||
t.loader.start();
|
||||
|
||||
size_t max_threads = t.loader.getMaxThreads();
|
||||
std::atomic<int> executing{0};
|
||||
|
||||
for (int concurrency = 4; concurrency <= 8; concurrency++)
|
||||
{
|
||||
auto job_func = [&] (const LoadJobPtr &)
|
||||
{
|
||||
executing++;
|
||||
t.randomSleepUs(100, 200, 100);
|
||||
ASSERT_LE(executing, max_threads);
|
||||
executing--;
|
||||
};
|
||||
|
||||
t.loader.stop();
|
||||
std::vector<LoadTaskPtr> tasks;
|
||||
tasks.reserve(concurrency);
|
||||
for (int i = 0; i < concurrency; i++)
|
||||
tasks.push_back(t.schedule(t.chainJobSet(5, job_func)));
|
||||
t.loader.start();
|
||||
t.loader.wait();
|
||||
ASSERT_EQ(executing, 0);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, StaticPriorities)
|
||||
{
|
||||
AsyncLoaderTest t(1);
|
||||
|
||||
std::string schedule;
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr & self)
|
||||
{
|
||||
schedule += fmt::format("{}{}", self->name, self->priority());
|
||||
};
|
||||
|
||||
std::vector<LoadJobPtr> jobs;
|
||||
jobs.push_back(makeLoadJob({}, 0, "A", job_func)); // 0
|
||||
jobs.push_back(makeLoadJob({ jobs[0] }, 3, "B", job_func)); // 1
|
||||
jobs.push_back(makeLoadJob({ jobs[0] }, 4, "C", job_func)); // 2
|
||||
jobs.push_back(makeLoadJob({ jobs[0] }, 1, "D", job_func)); // 3
|
||||
jobs.push_back(makeLoadJob({ jobs[0] }, 2, "E", job_func)); // 4
|
||||
jobs.push_back(makeLoadJob({ jobs[3], jobs[4] }, 0, "F", job_func)); // 5
|
||||
jobs.push_back(makeLoadJob({ jobs[5] }, 0, "G", job_func)); // 6
|
||||
jobs.push_back(makeLoadJob({ jobs[6] }, 9, "H", job_func)); // 7
|
||||
auto task = t.schedule({ jobs.begin(), jobs.end() });
|
||||
|
||||
t.loader.start();
|
||||
t.loader.wait();
|
||||
|
||||
ASSERT_EQ(schedule, "A9E9D9F9G9H9C4B3");
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, DynamicPriorities)
|
||||
{
|
||||
AsyncLoaderTest t(1);
|
||||
|
||||
for (bool prioritize : {false, true})
|
||||
{
|
||||
std::string schedule;
|
||||
|
||||
LoadJobPtr job_to_prioritize;
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr & self)
|
||||
{
|
||||
if (prioritize && self->name == "C")
|
||||
t.loader.prioritize(job_to_prioritize, 9); // dynamic prioritization
|
||||
schedule += fmt::format("{}{}", self->name, self->priority());
|
||||
};
|
||||
|
||||
// Job DAG with initial priorities. During execution of C4, job G0 priority is increased to G9, postponing B3 job executing.
|
||||
// A0 -+-> B3
|
||||
// |
|
||||
// `-> C4
|
||||
// |
|
||||
// `-> D1 -.
|
||||
// | +-> F0 --> G0 --> H0
|
||||
// `-> E2 -'
|
||||
std::vector<LoadJobPtr> jobs;
|
||||
jobs.push_back(makeLoadJob({}, 0, "A", job_func)); // 0
|
||||
jobs.push_back(makeLoadJob({ jobs[0] }, 3, "B", job_func)); // 1
|
||||
jobs.push_back(makeLoadJob({ jobs[0] }, 4, "C", job_func)); // 2
|
||||
jobs.push_back(makeLoadJob({ jobs[0] }, 1, "D", job_func)); // 3
|
||||
jobs.push_back(makeLoadJob({ jobs[0] }, 2, "E", job_func)); // 4
|
||||
jobs.push_back(makeLoadJob({ jobs[3], jobs[4] }, 0, "F", job_func)); // 5
|
||||
jobs.push_back(makeLoadJob({ jobs[5] }, 0, "G", job_func)); // 6
|
||||
jobs.push_back(makeLoadJob({ jobs[6] }, 0, "H", job_func)); // 7
|
||||
auto task = t.schedule({ jobs.begin(), jobs.end() });
|
||||
|
||||
job_to_prioritize = jobs[6];
|
||||
|
||||
t.loader.start();
|
||||
t.loader.wait();
|
||||
t.loader.stop();
|
||||
|
||||
if (prioritize)
|
||||
ASSERT_EQ(schedule, "A4C4E9D9F9G9B3H0");
|
||||
else
|
||||
ASSERT_EQ(schedule, "A4C4B3E2D1F0G0H0");
|
||||
}
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, RandomIndependentTasks)
|
||||
{
|
||||
AsyncLoaderTest t(16);
|
||||
t.loader.start();
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr & self)
|
||||
{
|
||||
for (const auto & dep : self->dependencies)
|
||||
ASSERT_EQ(dep->status(), LoadStatus::OK);
|
||||
t.randomSleepUs(100, 500, 5);
|
||||
};
|
||||
|
||||
std::vector<LoadTaskPtr> tasks;
|
||||
tasks.reserve(512);
|
||||
for (int i = 0; i < 512; i++)
|
||||
{
|
||||
int job_count = t.randomInt(1, 32);
|
||||
tasks.push_back(t.schedule(t.randomJobSet(job_count, 5, job_func)));
|
||||
t.randomSleepUs(100, 900, 20); // avg=100us
|
||||
}
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, RandomDependentTasks)
|
||||
{
|
||||
AsyncLoaderTest t(16);
|
||||
t.loader.start();
|
||||
|
||||
std::mutex mutex;
|
||||
std::condition_variable cv;
|
||||
std::vector<LoadTaskPtr> tasks;
|
||||
std::vector<LoadJobPtr> all_jobs;
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr & self)
|
||||
{
|
||||
for (const auto & dep : self->dependencies)
|
||||
ASSERT_EQ(dep->status(), LoadStatus::OK);
|
||||
cv.notify_one();
|
||||
};
|
||||
|
||||
std::unique_lock lock{mutex};
|
||||
|
||||
int tasks_left = 1000;
|
||||
tasks.reserve(tasks_left);
|
||||
while (tasks_left-- > 0)
|
||||
{
|
||||
cv.wait(lock, [&] { return t.loader.getScheduledJobCount() < 100; });
|
||||
|
||||
// Add one new task
|
||||
int job_count = t.randomInt(1, 32);
|
||||
LoadJobSet jobs = t.randomJobSet(job_count, 5, all_jobs, job_func);
|
||||
all_jobs.insert(all_jobs.end(), jobs.begin(), jobs.end());
|
||||
tasks.push_back(t.schedule(std::move(jobs)));
|
||||
|
||||
// Cancel random old task
|
||||
if (tasks.size() > 100)
|
||||
tasks.erase(tasks.begin() + t.randomInt<size_t>(0, tasks.size() - 1));
|
||||
}
|
||||
|
||||
t.loader.wait();
|
||||
}
|
||||
|
||||
TEST(AsyncLoader, SetMaxThreads)
|
||||
{
|
||||
AsyncLoaderTest t(1);
|
||||
|
||||
std::atomic<int> sync_index{0};
|
||||
std::atomic<int> executing{0};
|
||||
int max_threads_values[] = {1, 2, 3, 4, 5, 4, 3, 2, 1, 5, 10, 5, 1, 20, 1};
|
||||
std::vector<std::unique_ptr<std::barrier<>>> syncs;
|
||||
syncs.reserve(std::size(max_threads_values));
|
||||
for (int max_threads : max_threads_values)
|
||||
syncs.push_back(std::make_unique<std::barrier<>>(max_threads + 1));
|
||||
|
||||
|
||||
auto job_func = [&] (const LoadJobPtr &)
|
||||
{
|
||||
int idx = sync_index;
|
||||
if (idx < syncs.size())
|
||||
{
|
||||
executing++;
|
||||
syncs[idx]->arrive_and_wait(); // (A)
|
||||
executing--;
|
||||
syncs[idx]->arrive_and_wait(); // (B)
|
||||
}
|
||||
};
|
||||
|
||||
// Generate enough independent jobs
|
||||
for (int i = 0; i < 1000; i++)
|
||||
t.schedule({makeLoadJob({}, "job", job_func)})->detach();
|
||||
|
||||
t.loader.start();
|
||||
while (sync_index < syncs.size())
|
||||
{
|
||||
// Wait for `max_threads` jobs to start executing
|
||||
int idx = sync_index;
|
||||
while (executing.load() != max_threads_values[idx])
|
||||
{
|
||||
ASSERT_LE(executing, max_threads_values[idx]);
|
||||
std::this_thread::yield();
|
||||
}
|
||||
|
||||
// Allow all jobs to finish
|
||||
syncs[idx]->arrive_and_wait(); // (A)
|
||||
sync_index++;
|
||||
if (sync_index < syncs.size())
|
||||
t.loader.setMaxThreads(max_threads_values[sync_index]);
|
||||
syncs[idx]->arrive_and_wait(); // (B) this sync point is required to allow `executing` value to go back down to zero after we change number of workers
|
||||
}
|
||||
t.loader.wait();
|
||||
}
|
@ -25,18 +25,6 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
static constexpr size_t PRINT_MESSAGE_EACH_N_OBJECTS = 256;
|
||||
static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5;
|
||||
|
||||
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch)
|
||||
{
|
||||
if (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
|
||||
{
|
||||
LOG_INFO(log, "{}%", processed * 100.0 / total);
|
||||
watch.restart();
|
||||
}
|
||||
}
|
||||
|
||||
TablesLoader::TablesLoader(ContextMutablePtr global_context_, Databases databases_, LoadingStrictnessLevel strictness_mode_)
|
||||
: global_context(global_context_)
|
||||
, databases(std::move(databases_))
|
||||
|
Loading…
Reference in New Issue
Block a user