diff --git a/src/Common/AsyncLoader.cpp b/src/Common/AsyncLoader.cpp new file mode 100644 index 00000000000..6c19b1910eb --- /dev/null +++ b/src/Common/AsyncLoader.cpp @@ -0,0 +1,640 @@ +#include + +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int ASYNC_LOAD_CYCLE; + extern const int ASYNC_LOAD_FAILED; + extern const int ASYNC_LOAD_CANCELED; +} + +static constexpr size_t PRINT_MESSAGE_EACH_N_OBJECTS = 256; +static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5; + +void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch) +{ + if (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)) + { + LOG_INFO(log, "Processed: {}%", processed * 100.0 / total); + watch.restart(); + } +} + +LoadStatus LoadJob::status() const +{ + std::unique_lock lock{mutex}; + return load_status; +} + +std::exception_ptr LoadJob::exception() const +{ + std::unique_lock lock{mutex}; + return load_exception; +} + +ssize_t LoadJob::priority() const +{ + return load_priority; +} + +void LoadJob::wait() const +{ + std::unique_lock lock{mutex}; + waiters++; + finished.wait(lock, [this] { return load_status != LoadStatus::PENDING; }); + waiters--; + if (load_exception) + std::rethrow_exception(load_exception); +} + +void LoadJob::waitNoThrow() const noexcept +{ + std::unique_lock lock{mutex}; + waiters++; + finished.wait(lock, [this] { return load_status != LoadStatus::PENDING; }); + waiters--; +} + +size_t LoadJob::waitersCount() const +{ + std::unique_lock lock{mutex}; + return waiters; +} + +void LoadJob::ok() +{ + std::unique_lock lock{mutex}; + load_status = LoadStatus::OK; + finish(); +} + +void LoadJob::failed(const std::exception_ptr & ptr) +{ + std::unique_lock lock{mutex}; + load_status = LoadStatus::FAILED; + load_exception = ptr; + finish(); +} + +void LoadJob::canceled(const std::exception_ptr & ptr) +{ + std::unique_lock lock{mutex}; + load_status = LoadStatus::CANCELED; + load_exception = ptr; + finish(); +} + +void LoadJob::finish() +{ + func = {}; // To ensure job function is destructed before `AsyncLoader::wait()` and `LoadJob::wait()` return + finish_time = std::chrono::system_clock::now(); + if (waiters > 0) + finished.notify_all(); +} + +void LoadJob::scheduled() +{ + schedule_time = std::chrono::system_clock::now(); +} + +void LoadJob::enqueued() +{ + if (enqueue_time.load() == TimePoint{}) // Do not rewrite in case of requeue + enqueue_time = std::chrono::system_clock::now(); +} + +void LoadJob::execute(const LoadJobPtr & self) +{ + start_time = std::chrono::system_clock::now(); + func(self); +} + + +LoadTask::LoadTask(AsyncLoader & loader_, LoadJobSet && jobs_, LoadJobSet && goal_jobs_) + : loader(loader_) + , jobs(std::move(jobs_)) + , goal_jobs(std::move(goal_jobs_)) +{} + +LoadTask::~LoadTask() +{ + remove(); +} + +void LoadTask::merge(const LoadTaskPtr & task) +{ + chassert(&loader == &task->loader); + jobs.merge(task->jobs); + goal_jobs.merge(task->goal_jobs); +} + +void LoadTask::schedule() +{ + loader.schedule(*this); +} + +void LoadTask::remove() +{ + if (!jobs.empty()) + { + loader.remove(jobs); + jobs.clear(); + } +} + +void LoadTask::detach() +{ + jobs.clear(); +} + +AsyncLoader::AsyncLoader(Metric metric_threads, Metric metric_active_threads, size_t max_threads_, bool log_failures_, bool log_progress_) + : log_failures(log_failures_) + , log_progress(log_progress_) + , log(&Poco::Logger::get("AsyncLoader")) + , max_threads(max_threads_) + , pool(metric_threads, metric_active_threads, max_threads) +{ + +} + +AsyncLoader::~AsyncLoader() +{ + stop(); +} + +void AsyncLoader::start() +{ + std::unique_lock lock{mutex}; + is_running = true; + for (size_t i = 0; workers < max_threads && i < ready_queue.size(); i++) + spawn(lock); +} + +void AsyncLoader::wait() +{ + pool.wait(); +} + +void AsyncLoader::stop() +{ + { + std::unique_lock lock{mutex}; + is_running = false; + // NOTE: there is no need to notify because workers never wait + } + pool.wait(); +} + +void AsyncLoader::schedule(LoadTask & task) +{ + chassert(this == &task.loader); + scheduleImpl(task.jobs); +} + +void AsyncLoader::schedule(const LoadTaskPtr & task) +{ + chassert(this == &task->loader); + scheduleImpl(task->jobs); +} + +void AsyncLoader::schedule(const std::vector & tasks) +{ + LoadJobSet all_jobs; + for (const auto & task : tasks) + { + chassert(this == &task->loader); + all_jobs.insert(task->jobs.begin(), task->jobs.end()); + } + scheduleImpl(all_jobs); +} + +void AsyncLoader::scheduleImpl(const LoadJobSet & input_jobs) +{ + std::unique_lock lock{mutex}; + + // Restart watches after idle period + if (scheduled_jobs.empty()) + { + busy_period_start_time = std::chrono::system_clock::now(); + stopwatch.restart(); + old_jobs = finished_jobs.size(); + } + + // Make set of jobs to schedule: + // 1) exclude already scheduled or finished jobs + // 2) include pending dependencies, that are not yet scheduled + LoadJobSet jobs; + for (const auto & job : input_jobs) + gatherNotScheduled(job, jobs, lock); + + // Ensure scheduled_jobs graph will have no cycles. The only way to get a cycle is to add a cycle, assuming old jobs cannot reference new ones. + checkCycle(jobs, lock); + + // We do not want any exception to be throws after this point, because the following code is not exception-safe + DENY_ALLOCATIONS_IN_SCOPE; + + // Schedule all incoming jobs + for (const auto & job : jobs) + { + NOEXCEPT_SCOPE({ + ALLOW_ALLOCATIONS_IN_SCOPE; + scheduled_jobs.emplace(job, Info{.initial_priority = job->load_priority, .priority = job->load_priority}); + job->scheduled(); + }); + } + + // Process dependencies on scheduled pending jobs + for (const auto & job : jobs) + { + Info & info = scheduled_jobs.find(job)->second; + for (const auto & dep : job->dependencies) + { + // Register every dependency on scheduled job with back-link to dependent job + if (auto dep_info = scheduled_jobs.find(dep); dep_info != scheduled_jobs.end()) + { + NOEXCEPT_SCOPE({ + ALLOW_ALLOCATIONS_IN_SCOPE; + dep_info->second.dependent_jobs.insert(job); + }); + info.dependencies_left++; + + // Priority inheritance: prioritize deps to have at least given `priority` to avoid priority inversion + prioritize(dep, info.priority, lock); + } + } + + // Enqueue non-blocked jobs (w/o dependencies) to ready queue + if (!info.is_blocked()) + enqueue(info, job, lock); + } + + // Process dependencies on other jobs. It is done in a separate pass to facilitate propagation of cancel signals (if any). + for (const auto & job : jobs) + { + if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end()) + { + for (const auto & dep : job->dependencies) + { + if (scheduled_jobs.contains(dep)) + continue; // Skip dependencies on scheduled pending jobs (already processed) + LoadStatus dep_status = dep->status(); + if (dep_status == LoadStatus::OK) + continue; // Dependency on already successfully finished job -- it's okay. + + // Dependency on not scheduled pending job -- it's bad. + // Probably, there is an error in `jobs` set, `gatherNotScheduled()` should have fixed it. + chassert(dep_status != LoadStatus::PENDING); + + if (dep_status == LoadStatus::FAILED || dep_status == LoadStatus::CANCELED) + { + // Dependency on already failed or canceled job -- it's okay. Cancel all dependent jobs. + std::exception_ptr e; + NOEXCEPT_SCOPE({ + ALLOW_ALLOCATIONS_IN_SCOPE; + e = std::make_exception_ptr(Exception(ErrorCodes::ASYNC_LOAD_CANCELED, + "Load job '{}' -> {}", + job->name, + getExceptionMessage(dep->exception(), /* with_stacktrace = */ false))); + }); + finish(lock, job, LoadStatus::CANCELED, e); + break; // This job is now finished, stop its dependencies processing + } + } + } + else + { + // Job was already canceled on previous iteration of this cycle -- skip + } + } +} + +void AsyncLoader::gatherNotScheduled(const LoadJobPtr & job, LoadJobSet & jobs, std::unique_lock & lock) +{ + if (job->status() == LoadStatus::PENDING && !scheduled_jobs.contains(job) && !jobs.contains(job)) + { + jobs.insert(job); + for (const auto & dep : job->dependencies) + gatherNotScheduled(dep, jobs, lock); + } +} + +void AsyncLoader::prioritize(const LoadJobPtr & job, ssize_t new_priority) +{ + if (!job) + return; + DENY_ALLOCATIONS_IN_SCOPE; + std::unique_lock lock{mutex}; + prioritize(job, new_priority, lock); +} + +void AsyncLoader::remove(const LoadJobSet & jobs) +{ + DENY_ALLOCATIONS_IN_SCOPE; + std::unique_lock lock{mutex}; + // On the first pass: + // - cancel all not executing jobs to avoid races + // - do not wait executing jobs (otherwise, on unlock a worker could start executing a dependent job, that should be canceled) + for (const auto & job : jobs) + { + if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end()) + { + if (info->second.is_executing()) + continue; // Skip executing jobs on the first pass + std::exception_ptr e; + NOEXCEPT_SCOPE({ + ALLOW_ALLOCATIONS_IN_SCOPE; + e = std::make_exception_ptr(Exception(ErrorCodes::ASYNC_LOAD_CANCELED, "Load job '{}' canceled", job->name)); + }); + finish(lock, job, LoadStatus::CANCELED, e); + } + } + // On the second pass wait for executing jobs to finish + for (const auto & job : jobs) + { + if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end()) + { + // Job is currently executing + chassert(info->second.is_executing()); + lock.unlock(); + job->waitNoThrow(); // Wait for job to finish + lock.lock(); + } + } + // On the third pass all jobs are finished - remove them all + // It is better to do it under one lock to avoid exposing intermediate states + for (const auto & job : jobs) + { + size_t erased = finished_jobs.erase(job); + if (old_jobs >= erased && job->finishTime() != LoadJob::TimePoint{} && job->finishTime() < busy_period_start_time) + old_jobs -= erased; + } +} + +void AsyncLoader::setMaxThreads(size_t value) +{ + std::unique_lock lock{mutex}; + pool.setMaxThreads(value); + pool.setMaxFreeThreads(value); + pool.setQueueSize(value); + max_threads = value; + if (!is_running) + return; + for (size_t i = 0; workers < max_threads && i < ready_queue.size(); i++) + spawn(lock); +} + +size_t AsyncLoader::getMaxThreads() const +{ + std::unique_lock lock{mutex}; + return max_threads; +} + +size_t AsyncLoader::getScheduledJobCount() const +{ + std::unique_lock lock{mutex}; + return scheduled_jobs.size(); +} + +std::vector AsyncLoader::getJobStates() const +{ + std::unique_lock lock{mutex}; + std::multimap states; + for (const auto & [job, info] : scheduled_jobs) + states.emplace(job->name, JobState{ + .job = job, + .dependencies_left = info.dependencies_left, + .is_executing = info.is_executing(), + .is_blocked = info.is_blocked(), + .is_ready = info.is_ready(), + .initial_priority = info.initial_priority, + .ready_seqno = last_ready_seqno + }); + for (const auto & job : finished_jobs) + states.emplace(job->name, JobState{.job = job}); + lock.unlock(); + std::vector result; + for (auto && [_, state] : states) + result.emplace_back(std::move(state)); + return result; +} + +void AsyncLoader::checkCycle(const LoadJobSet & jobs, std::unique_lock & lock) +{ + LoadJobSet left = jobs; + LoadJobSet visited; + visited.reserve(left.size()); + while (!left.empty()) + { + LoadJobPtr job = *left.begin(); + checkCycleImpl(job, left, visited, lock); + } +} + +String AsyncLoader::checkCycleImpl(const LoadJobPtr & job, LoadJobSet & left, LoadJobSet & visited, std::unique_lock & lock) +{ + if (!left.contains(job)) + return {}; // Do not consider external dependencies and already processed jobs + if (auto [_, inserted] = visited.insert(job); !inserted) + { + visited.erase(job); // Mark where cycle ends + return job->name; + } + for (const auto & dep : job->dependencies) + { + if (auto chain = checkCycleImpl(dep, left, visited, lock); !chain.empty()) + { + if (!visited.contains(job)) // Check for cycle end + throw Exception(ErrorCodes::ASYNC_LOAD_CYCLE, "Load job dependency cycle detected: {} -> {}", job->name, chain); + else + return fmt::format("{} -> {}", job->name, chain); // chain is not a cycle yet -- continue building + } + } + left.erase(job); + return {}; +} + +void AsyncLoader::finish(std::unique_lock & lock, const LoadJobPtr & job, LoadStatus status, std::exception_ptr exception_from_job) +{ + if (status == LoadStatus::OK) + { + // Notify waiters + job->ok(); + + // Update dependent jobs and enqueue if ready + chassert(scheduled_jobs.contains(job)); // Job was pending + for (const auto & dep : scheduled_jobs[job].dependent_jobs) + { + chassert(scheduled_jobs.contains(dep)); // All depended jobs must be pending + Info & dep_info = scheduled_jobs[dep]; + dep_info.dependencies_left--; + if (!dep_info.is_blocked()) + enqueue(dep_info, dep, lock); + } + } + else + { + // Notify waiters + if (status == LoadStatus::FAILED) + job->failed(exception_from_job); + else if (status == LoadStatus::CANCELED) + job->canceled(exception_from_job); + + chassert(scheduled_jobs.contains(job)); // Job was pending + Info & info = scheduled_jobs[job]; + if (info.is_ready()) + { + ready_queue.erase(info.key()); + info.ready_seqno = 0; + } + + // Recurse into all dependent jobs + LoadJobSet dependent; + dependent.swap(info.dependent_jobs); // To avoid container modification during recursion + for (const auto & dep : dependent) + { + if (!scheduled_jobs.contains(dep)) + continue; // Job has already been canceled + std::exception_ptr e; + NOEXCEPT_SCOPE({ + ALLOW_ALLOCATIONS_IN_SCOPE; + e = std::make_exception_ptr( + Exception(ErrorCodes::ASYNC_LOAD_CANCELED, + "Load job '{}' -> {}", + dep->name, + getExceptionMessage(exception_from_job, /* with_stacktrace = */ false))); + }); + finish(lock, dep, LoadStatus::CANCELED, e); + } + + // Clean dependency graph edges pointing to canceled jobs + for (const auto & dep : job->dependencies) + if (auto dep_info = scheduled_jobs.find(dep); dep_info != scheduled_jobs.end()) + dep_info->second.dependent_jobs.erase(job); + } + + // Job became finished + scheduled_jobs.erase(job); + NOEXCEPT_SCOPE({ + ALLOW_ALLOCATIONS_IN_SCOPE; + finished_jobs.insert(job); + if (log_progress) + logAboutProgress(log, finished_jobs.size() - old_jobs, finished_jobs.size() + scheduled_jobs.size() - old_jobs, stopwatch); + }); +} + +void AsyncLoader::prioritize(const LoadJobPtr & job, ssize_t new_priority, std::unique_lock & lock) +{ + if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end()) + { + if (info->second.priority >= new_priority) + return; // Never lower priority + + // Update priority and push job forward through ready queue if needed + if (info->second.ready_seqno) + ready_queue.erase(info->second.key()); + info->second.priority = new_priority; + job->load_priority.store(new_priority); // Set user-facing priority (may affect executing jobs) + if (info->second.ready_seqno) + { + NOEXCEPT_SCOPE({ + ALLOW_ALLOCATIONS_IN_SCOPE; + ready_queue.emplace(info->second.key(), job); + }); + } + + // Recurse into dependencies + for (const auto & dep : job->dependencies) + prioritize(dep, new_priority, lock); + } +} + +void AsyncLoader::enqueue(Info & info, const LoadJobPtr & job, std::unique_lock & lock) +{ + chassert(!info.is_blocked()); + chassert(info.ready_seqno == 0); + info.ready_seqno = ++last_ready_seqno; + NOEXCEPT_SCOPE({ + ALLOW_ALLOCATIONS_IN_SCOPE; + ready_queue.emplace(info.key(), job); + }); + + job->enqueued(); + + if (is_running && workers < max_threads) + spawn(lock); +} + +void AsyncLoader::spawn(std::unique_lock &) +{ + workers++; + NOEXCEPT_SCOPE({ + ALLOW_ALLOCATIONS_IN_SCOPE; + pool.scheduleOrThrowOnError([this] { worker(); }); + }); +} + +void AsyncLoader::worker() +{ + DENY_ALLOCATIONS_IN_SCOPE; + + LoadJobPtr job; + std::exception_ptr exception_from_job; + while (true) + { + // This is inside the loop to also reset previous thread names set inside the jobs + setThreadName("AsyncLoader"); + + { + std::unique_lock lock{mutex}; + + // Handle just executed job + if (exception_from_job) + finish(lock, job, LoadStatus::FAILED, exception_from_job); + else if (job) + finish(lock, job, LoadStatus::OK); + + if (!is_running || ready_queue.empty() || workers > max_threads) + { + workers--; + return; + } + + // Take next job to be executed from the ready queue + auto it = ready_queue.begin(); + job = it->second; + ready_queue.erase(it); + scheduled_jobs.find(job)->second.ready_seqno = 0; // This job is no longer in the ready queue + } + + ALLOW_ALLOCATIONS_IN_SCOPE; + + try + { + job->execute(job); + exception_from_job = {}; + } + catch (...) + { + NOEXCEPT_SCOPE({ + if (log_failures) + tryLogCurrentException(__PRETTY_FUNCTION__); + exception_from_job = std::make_exception_ptr( + Exception(ErrorCodes::ASYNC_LOAD_FAILED, + "Load job '{}' failed: {}", + job->name, + getCurrentExceptionMessage(/* with_stacktrace = */ true))); + }); + } + } +} + +} diff --git a/src/Common/AsyncLoader.h b/src/Common/AsyncLoader.h new file mode 100644 index 00000000000..d4a3218a541 --- /dev/null +++ b/src/Common/AsyncLoader.h @@ -0,0 +1,439 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace Poco { class Logger; } + +namespace DB +{ + +class LoadJob; +using LoadJobPtr = std::shared_ptr; +using LoadJobSet = std::unordered_set; +class LoadTask; +using LoadTaskPtr = std::shared_ptr; +using LoadTaskPtrs = std::vector; +class AsyncLoader; + +void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch); + +// Execution status of a load job. +enum class LoadStatus +{ + PENDING, // Load job is not started yet. + OK, // Load job executed and was successful. + FAILED, // Load job executed and failed. + CANCELED // Load job is not going to be executed due to removal or dependency failure. +}; + +// Smallest indivisible part of a loading process. Load job can have multiple dependencies, thus jobs constitute a direct acyclic graph (DAG). +// Job encapsulates a function to be executed by `AsyncLoader` as soon as job functions of all dependencies are successfully executed. +// Job can be waited for by an arbitrary number of threads. See `AsyncLoader` class description for more details. +class LoadJob : private boost::noncopyable +{ +public: + template + LoadJob(LoadJobSetType && dependencies_, String name_, Func && func_, ssize_t priority_ = 0) + : dependencies(std::forward(dependencies_)) + , name(std::move(name_)) + , func(std::forward(func_)) + , load_priority(priority_) + {} + + // Current job status. + LoadStatus status() const; + std::exception_ptr exception() const; + + // Returns current value of a priority of the job. May differ from initial priority. + ssize_t priority() const; + + // Sync wait for a pending job to be finished: OK, FAILED or CANCELED status. + // Throws if job is FAILED or CANCELED. Returns or throws immediately on non-pending job. + void wait() const; + + // Wait for a job to reach any non PENDING status. + void waitNoThrow() const noexcept; + + // Returns number of threads blocked by `wait()` or `waitNoThrow()` calls. + size_t waitersCount() const; + + // Introspection + using TimePoint = std::chrono::system_clock::time_point; + TimePoint scheduleTime() const { return schedule_time; } + TimePoint enqueueTime() const { return enqueue_time; } + TimePoint startTime() const { return start_time; } + TimePoint finishTime() const { return finish_time; } + + const LoadJobSet dependencies; // Jobs to be done before this one (with ownership), it is `const` to make creation of cycles hard + const String name; + +private: + friend class AsyncLoader; + + void ok(); + void failed(const std::exception_ptr & ptr); + void canceled(const std::exception_ptr & ptr); + void finish(); + + void scheduled(); + void enqueued(); + void execute(const LoadJobPtr & self); + + std::function func; + std::atomic load_priority; + + mutable std::mutex mutex; + mutable std::condition_variable finished; + mutable size_t waiters = 0; + LoadStatus load_status{LoadStatus::PENDING}; + std::exception_ptr load_exception; + + std::atomic schedule_time{TimePoint{}}; + std::atomic enqueue_time{TimePoint{}}; + std::atomic start_time{TimePoint{}}; + std::atomic finish_time{TimePoint{}}; +}; + +struct EmptyJobFunc +{ + void operator()(const LoadJobPtr &) {} +}; + +template +LoadJobPtr makeLoadJob(LoadJobSet && dependencies, String name, Func && func = EmptyJobFunc()) +{ + return std::make_shared(std::move(dependencies), std::move(name), std::forward(func)); +} + +template +LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, String name, Func && func = EmptyJobFunc()) +{ + return std::make_shared(dependencies, std::move(name), std::forward(func)); +} + +template +LoadJobPtr makeLoadJob(LoadJobSet && dependencies, ssize_t priority, String name, Func && func = EmptyJobFunc()) +{ + return std::make_shared(std::move(dependencies), std::move(name), std::forward(func), priority); +} + +template +LoadJobPtr makeLoadJob(const LoadJobSet & dependencies, ssize_t priority, String name, Func && func = EmptyJobFunc()) +{ + return std::make_shared(dependencies, std::move(name), std::forward(func), priority); +} + +// Represents a logically connected set of LoadJobs required to achieve some goals (final LoadJob in the set). +class LoadTask : private boost::noncopyable +{ +public: + LoadTask(AsyncLoader & loader_, LoadJobSet && jobs_, LoadJobSet && goal_jobs_ = {}); + ~LoadTask(); + + // Merge all jobs from other task into this task. + void merge(const LoadTaskPtr & task); + + // Schedule all jobs with AsyncLoader. + void schedule(); + + // Remove all jobs of this task from AsyncLoader. + void remove(); + + // Do not track jobs in this task. + // WARNING: Jobs will never be removed() and are going to be stored as finished jobs until ~AsyncLoader(). + void detach(); + + // Return the final jobs in this tasks. This job subset should be used as `dependencies` for dependent jobs or tasks: + // auto load_task = loadSomethingAsync(async_loader, load_after_task.goals(), something); + const LoadJobSet & goals() const { return goal_jobs.empty() ? jobs : goal_jobs; } + +private: + friend class AsyncLoader; + + AsyncLoader & loader; + LoadJobSet jobs; + LoadJobSet goal_jobs; +}; + +inline LoadTaskPtr makeLoadTask(AsyncLoader & loader, LoadJobSet && jobs, LoadJobSet && goals = {}) +{ + return std::make_shared(loader, std::move(jobs), std::move(goals)); +} + +inline void scheduleLoad(const LoadTaskPtr & task) +{ + task->schedule(); +} + +inline void scheduleLoad(const LoadTaskPtrs & tasks) +{ + for (const auto & task : tasks) + task->schedule(); +} + +template +inline void scheduleLoad(Args && ... args) +{ + (scheduleLoad(std::forward(args)), ...); +} + +inline void waitLoad(const LoadJobSet & jobs) +{ + for (const auto & job : jobs) + job->wait(); +} + +inline void waitLoad(const LoadTaskPtr & task) +{ + waitLoad(task->goals()); +} + +inline void waitLoad(const LoadTaskPtrs & tasks) +{ + for (const auto & task : tasks) + waitLoad(task->goals()); +} + +template +inline void waitLoad(Args && ... args) +{ + (waitLoad(std::forward(args)), ...); +} + +template +inline void scheduleAndWaitLoad(Args && ... args) +{ + scheduleLoad(std::forward(args)...); + waitLoad(std::forward(args)...); +} + +inline LoadJobSet getGoals(const LoadTaskPtrs & tasks) +{ + LoadJobSet result; + for (const auto & task : tasks) + result.insert(task->goals().begin(), task->goals().end()); + return result; +} + +inline LoadJobSet joinJobs(const LoadJobSet & jobs1, const LoadJobSet & jobs2) +{ + LoadJobSet result; + if (!jobs1.empty()) + result.insert(jobs1.begin(), jobs1.end()); + if (!jobs2.empty()) + result.insert(jobs2.begin(), jobs2.end()); + return result; +} + +inline LoadTaskPtrs joinTasks(const LoadTaskPtrs & tasks1, const LoadTaskPtrs & tasks2) +{ + if (tasks1.empty()) + return tasks2; + if (tasks2.empty()) + return tasks1; + LoadTaskPtrs result; + result.reserve(tasks1.size() + tasks2.size()); + result.insert(result.end(), tasks1.begin(), tasks1.end()); + result.insert(result.end(), tasks2.begin(), tasks2.end()); + return result; +} + +// `AsyncLoader` is a scheduler for DAG of `LoadJob`s. It tracks dependencies and priorities of jobs. +// Basic usage example: +// auto job_func = [&] (const LoadJobPtr & self) { +// LOG_TRACE(log, "Executing load job '{}' with priority '{}'", self->name, self->priority()); +// }; +// auto job1 = makeLoadJob({}, "job1", job_func); +// auto job2 = makeLoadJob({ job1 }, "job2", job_func); +// auto job3 = makeLoadJob({ job1 }, "job3", job_func); +// auto task = makeLoadTask(async_loader, { job1, job2, job3 }); +// task.schedule(); +// Here we have created and scheduled a task consisting of three jobs. Job1 has no dependencies and is run first. +// Job2 and job3 depend on job1 and are run only after job1 completion. Another thread may prioritize a job and wait for it: +// async_loader->prioritize(job3, /* priority = */ 1); // higher priority jobs are run first, default priority is zero. +// job3->wait(); // blocks until job completion or cancellation and rethrow an exception (if any) +// +// AsyncLoader tracks state of all scheduled jobs. Job lifecycle is the following: +// 1) Job is constructed with PENDING status and initial priority. The job is placed into a task. +// 2) The task is scheduled with all its jobs and their dependencies. A scheduled job may be ready (i.e. have all its dependencies finished) or blocked. +// 3a) When all dependencies are successfully executed, the job became ready. A ready job is enqueued into the ready queue. +// 3b) If at least one of the job dependencies is failed or canceled, then this job is canceled (with all it's dependent jobs as well). +// On cancellation an ASYNC_LOAD_CANCELED exception is generated and saved inside LoadJob object. The job status is changed to CANCELED. +// Exception is rethrown by any existing or new `wait()` call. The job is moved to the set of the finished jobs. +// 4) The scheduled pending ready job starts execution by a worker. The job is dequeued. Callback `job_func` is called. +// Status of an executing job is PENDING. And it is still considered as a scheduled job by AsyncLoader. +// Note that `job_func` of a CANCELED job is never executed. +// 5a) On successful execution the job status is changed to OK and all existing and new `wait()` calls finish w/o exceptions. +// 5b) Any exception thrown out of `job_func` is wrapped into an ASYNC_LOAD_FAILED exception and saved inside LoadJob. +// The job status is changed to FAILED. All the dependent jobs are canceled. The exception is rethrown from all existing and new `wait()` calls. +// 6) The job is no longer considered as scheduled and is instead moved to the finished jobs set. This is just for introspection of the finished jobs. +// 7) The task containing this job is destructed or `remove()` is explicitly called. The job is removed from the finished job set. +// 8) The job is destructed. +// +// Every job has a priority associated with it. AsyncLoader runs higher priority (greater `priority` value) jobs first. Job priority can be elevated +// (a) if either it has a dependent job with higher priority (in this case priority of a dependent job is inherited); +// (b) or job was explicitly prioritized by `prioritize(job, higher_priority)` call (this also leads to a priority inheritance for all the dependencies). +// Note that to avoid priority inversion `job_func` should use `self->priority()` to schedule new jobs in AsyncLoader or any other pool. +// Value stored in load job priority field is atomic and can be increased even during job execution. +// +// When a task is scheduled it can contain dependencies on previously scheduled jobs. These jobs can have any status. If job A being scheduled depends on +// another job B that is not yet scheduled, then job B will also be scheduled (even if the task does not contain it). +class AsyncLoader : private boost::noncopyable +{ +private: + // Key of a pending job in the ready queue. + struct ReadyKey + { + ssize_t priority; // Ascending order + ssize_t initial_priority; // Ascending order + UInt64 ready_seqno; // Descending order + + bool operator<(const ReadyKey & rhs) const + { + if (priority > rhs.priority) + return true; + if (priority < rhs.priority) + return false; + if (initial_priority > rhs.initial_priority) + return true; + if (initial_priority < rhs.initial_priority) + return false; + return ready_seqno < rhs.ready_seqno; + } + }; + + // Scheduling information for a pending job. + struct Info + { + ssize_t initial_priority = 0; // Initial priority passed into schedule(). + ssize_t priority = 0; // Elevated priority, due to priority inheritance or prioritize(). + size_t dependencies_left = 0; // Current number of dependencies on pending jobs. + UInt64 ready_seqno = 0; // Zero means that job is not in ready queue. + LoadJobSet dependent_jobs; // Set of jobs dependent on this job. + + // Three independent states of a non-finished job. + bool is_blocked() const { return dependencies_left > 0; } + bool is_ready() const { return dependencies_left == 0 && ready_seqno > 0; } + bool is_executing() const { return dependencies_left == 0 && ready_seqno == 0; } + + // Get key of a ready job + ReadyKey key() const + { + return {.priority = priority, .initial_priority = initial_priority, .ready_seqno = ready_seqno}; + } + }; + +public: + using Metric = CurrentMetrics::Metric; + + AsyncLoader(Metric metric_threads, Metric metric_active_threads, size_t max_threads_, bool log_failures_, bool log_progress_); + + // WARNING: all tasks instances should be destructed before associated AsyncLoader. + ~AsyncLoader(); + + // Start workers to execute scheduled load jobs. + void start(); + + // Wait for all load jobs to finish, including all new jobs. So at first take care to stop adding new jobs. + void wait(); + + // Wait for currently executing jobs to finish, but do not run any other pending jobs. + // Not finished jobs are left in pending state: + // - they can be executed by calling start() again; + // - or canceled using ~Task() or remove() later. + void stop(); + + // Schedule all jobs of given `task` and their dependencies (if any, not scheduled yet). + // Higher priority jobs (with greater `job->priority()` value) are executed earlier. + // All dependencies of a scheduled job inherit its priority if it is higher. This way higher priority job + // never wait for (blocked by) lower priority jobs. No priority inversion is possible. + // Note that `task` destructor ensures that all its jobs are finished (OK, FAILED or CANCELED) + // and are removed from AsyncLoader, so it is thread-safe to destroy them. + void schedule(LoadTask & task); + void schedule(const LoadTaskPtr & task); + + // Schedule all tasks atomically. To ensure only highest priority jobs among all tasks are run first. + void schedule(const std::vector & tasks); + + // Increase priority of a job and all its dependencies recursively. + void prioritize(const LoadJobPtr & job, ssize_t new_priority); + + // Remove finished jobs, cancel scheduled jobs, wait for executing jobs to finish and remove them. + void remove(const LoadJobSet & jobs); + + // Increase or decrease maximum number of simultaneously executing jobs. + void setMaxThreads(size_t value); + + size_t getMaxThreads() const; + size_t getScheduledJobCount() const; + + // Helper class for introspection + struct JobState + { + LoadJobPtr job; + size_t dependencies_left = 0; + bool is_executing = false; + bool is_blocked = false; + bool is_ready = false; + std::optional initial_priority; + std::optional ready_seqno; + }; + + // For introspection and debug only, see `system.async_loader` table + std::vector getJobStates() const; + +private: + void checkCycle(const LoadJobSet & jobs, std::unique_lock & lock); + String checkCycleImpl(const LoadJobPtr & job, LoadJobSet & left, LoadJobSet & visited, std::unique_lock & lock); + void finish(std::unique_lock & lock, const LoadJobPtr & job, LoadStatus status, std::exception_ptr exception_from_job = {}); + void scheduleImpl(const LoadJobSet & input_jobs); + void gatherNotScheduled(const LoadJobPtr & job, LoadJobSet & jobs, std::unique_lock & lock); + void prioritize(const LoadJobPtr & job, ssize_t new_priority, std::unique_lock & lock); + void enqueue(Info & info, const LoadJobPtr & job, std::unique_lock & lock); + void spawn(std::unique_lock &); + void worker(); + + // Logging + const bool log_failures; // Worker should log all exceptions caught from job functions. + const bool log_progress; // Periodically log total progress + Poco::Logger * log; + std::chrono::system_clock::time_point busy_period_start_time; + AtomicStopwatch stopwatch; + size_t old_jobs = 0; // Number of jobs that were finished in previous busy period (for correct progress indication) + + mutable std::mutex mutex; // Guards all the fields below. + bool is_running = false; + + // Full set of scheduled pending jobs along with scheduling info. + std::unordered_map scheduled_jobs; + + // Subset of scheduled pending non-blocked jobs (waiting for a worker to be executed). + // Represent a queue of jobs in order of decreasing priority and FIFO for jobs with equal priorities. + std::map ready_queue; + + // Set of finished jobs (for introspection only, until jobs are removed). + LoadJobSet finished_jobs; + + // Increasing counter for `ReadyKey` assignment (to preserve FIFO order of the jobs with equal priorities). + UInt64 last_ready_seqno = 0; + + // For executing jobs. Note that we avoid using an internal queue of the pool to be able to prioritize jobs. + size_t max_threads; + size_t workers = 0; + ThreadPool pool; +}; + +} diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index e9dc5649245..24e6114b26c 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -576,6 +576,9 @@ M(691, UNKNOWN_ELEMENT_OF_ENUM) \ M(692, TOO_MANY_MUTATIONS) \ M(693, AWS_ERROR) \ + M(694, ASYNC_LOAD_CYCLE) \ + M(695, ASYNC_LOAD_FAILED) \ + M(696, ASYNC_LOAD_CANCELED) \ \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ diff --git a/src/Common/tests/gtest_async_loader.cpp b/src/Common/tests/gtest_async_loader.cpp new file mode 100644 index 00000000000..5666c4b923e --- /dev/null +++ b/src/Common/tests/gtest_async_loader.cpp @@ -0,0 +1,749 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +using namespace DB; + +namespace CurrentMetrics +{ + extern const Metric TablesLoaderThreads; + extern const Metric TablesLoaderThreadsActive; +} + +namespace DB::ErrorCodes +{ + extern const int ASYNC_LOAD_CYCLE; + extern const int ASYNC_LOAD_FAILED; + extern const int ASYNC_LOAD_CANCELED; +} + +struct AsyncLoaderTest +{ + AsyncLoader loader; + + std::mutex rng_mutex; + pcg64 rng{randomSeed()}; + + explicit AsyncLoaderTest(size_t max_threads = 1) + : loader(CurrentMetrics::TablesLoaderThreads, CurrentMetrics::TablesLoaderThreadsActive, max_threads, /* log_failures = */ false, /* log_progress = */ false) + {} + + template + T randomInt(T from, T to) + { + std::uniform_int_distribution distribution(from, to); + std::scoped_lock lock(rng_mutex); + return distribution(rng); + } + + void randomSleepUs(UInt64 min_us, UInt64 max_us, int probability_percent) + { + if (randomInt(0, 99) < probability_percent) + std::this_thread::sleep_for(std::chrono::microseconds(randomInt(min_us, max_us))); + } + + template + LoadJobSet randomJobSet(int job_count, int dep_probability_percent, JobFunc job_func, std::string_view name_prefix = "job") + { + std::vector jobs; + jobs.reserve(job_count); + for (int j = 0; j < job_count; j++) + { + LoadJobSet deps; + for (int d = 0; d < j; d++) + { + if (randomInt(0, 99) < dep_probability_percent) + deps.insert(jobs[d]); + } + jobs.push_back(makeLoadJob(std::move(deps), fmt::format("{}{}", name_prefix, j), job_func)); + } + return {jobs.begin(), jobs.end()}; + } + + template + LoadJobSet randomJobSet(int job_count, int dep_probability_percent, const std::vector & external_deps, JobFunc job_func, std::string_view name_prefix = "job") + { + std::vector jobs; + jobs.reserve(job_count); + for (int j = 0; j < job_count; j++) + { + LoadJobSet deps; + for (int d = 0; d < j; d++) + { + if (randomInt(0, 99) < dep_probability_percent) + deps.insert(jobs[d]); + } + if (!external_deps.empty() && randomInt(0, 99) < dep_probability_percent) + deps.insert(external_deps[randomInt(0, external_deps.size() - 1)]); + jobs.push_back(makeLoadJob(std::move(deps), fmt::format("{}{}", name_prefix, j), job_func)); + } + return {jobs.begin(), jobs.end()}; + } + + template + LoadJobSet chainJobSet(int job_count, JobFunc job_func, std::string_view name_prefix = "job") + { + std::vector jobs; + jobs.reserve(job_count); + jobs.push_back(makeLoadJob({}, fmt::format("{}{}", name_prefix, 0), job_func)); + for (int j = 1; j < job_count; j++) + jobs.push_back(makeLoadJob({ jobs[j - 1] }, fmt::format("{}{}", name_prefix, j), job_func)); + return {jobs.begin(), jobs.end()}; + } + + LoadTaskPtr schedule(LoadJobSet && jobs) + { + LoadTaskPtr task = makeLoadTask(loader, std::move(jobs)); + task->schedule(); + return task; + } +}; + +TEST(AsyncLoader, Smoke) +{ + AsyncLoaderTest t(2); + + static constexpr ssize_t low_priority = -1; + + std::atomic jobs_done{0}; + std::atomic low_priority_jobs_done{0}; + + auto job_func = [&] (const LoadJobPtr & self) { + jobs_done++; + if (self->priority() == low_priority) + low_priority_jobs_done++; + }; + + { + auto job1 = makeLoadJob({}, "job1", job_func); + auto job2 = makeLoadJob({ job1 }, "job2", job_func); + auto task1 = t.schedule({ job1, job2 }); + + auto job3 = makeLoadJob({ job2 }, "job3", job_func); + auto job4 = makeLoadJob({ job2 }, "job4", job_func); + auto task2 = t.schedule({ job3, job4 }); + auto job5 = makeLoadJob({ job3, job4 }, low_priority, "job5", job_func); + task2->merge(t.schedule({ job5 })); + + std::thread waiter_thread([=] { job5->wait(); }); + + t.loader.start(); + + job3->wait(); + t.loader.wait(); + job4->wait(); + + waiter_thread.join(); + + ASSERT_EQ(job1->status(), LoadStatus::OK); + ASSERT_EQ(job2->status(), LoadStatus::OK); + } + + ASSERT_EQ(jobs_done, 5); + ASSERT_EQ(low_priority_jobs_done, 1); + + t.loader.stop(); +} + +TEST(AsyncLoader, CycleDetection) +{ + AsyncLoaderTest t; + + auto job_func = [&] (const LoadJobPtr &) {}; + + LoadJobPtr cycle_breaker; // To avoid memleak we introduce with a cycle + + try + { + std::vector jobs; + jobs.reserve(16); + jobs.push_back(makeLoadJob({}, "job0", job_func)); + jobs.push_back(makeLoadJob({ jobs[0] }, "job1", job_func)); + jobs.push_back(makeLoadJob({ jobs[0], jobs[1] }, "job2", job_func)); + jobs.push_back(makeLoadJob({ jobs[0], jobs[2] }, "job3", job_func)); + + // Actually it is hard to construct a cycle, but suppose someone was able to succeed violating constness + const_cast(jobs[1]->dependencies).insert(jobs[3]); + cycle_breaker = jobs[1]; + + // Add couple unrelated jobs + jobs.push_back(makeLoadJob({ jobs[1] }, "job4", job_func)); + jobs.push_back(makeLoadJob({ jobs[4] }, "job5", job_func)); + jobs.push_back(makeLoadJob({ jobs[3] }, "job6", job_func)); + jobs.push_back(makeLoadJob({ jobs[1], jobs[2], jobs[3], jobs[4], jobs[5], jobs[6] }, "job7", job_func)); + + // Also add another not connected jobs + jobs.push_back(makeLoadJob({}, "job8", job_func)); + jobs.push_back(makeLoadJob({}, "job9", job_func)); + jobs.push_back(makeLoadJob({ jobs[9] }, "job10", job_func)); + + auto task1 = t.schedule({ jobs.begin(), jobs.end()}); + FAIL(); + } + catch (Exception & e) + { + int present[] = { 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0 }; + for (int i = 0; i < std::size(present); i++) + ASSERT_EQ(e.message().find(fmt::format("job{}", i)) != String::npos, present[i]); + } + + const_cast(cycle_breaker->dependencies).clear(); +} + +TEST(AsyncLoader, CancelPendingJob) +{ + AsyncLoaderTest t; + + auto job_func = [&] (const LoadJobPtr &) {}; + + auto job = makeLoadJob({}, "job", job_func); + auto task = t.schedule({ job }); + + task->remove(); // this cancels pending the job (async loader was not started to execute it) + + ASSERT_EQ(job->status(), LoadStatus::CANCELED); + try + { + job->wait(); + FAIL(); + } + catch (Exception & e) + { + ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED); + } +} + +TEST(AsyncLoader, CancelPendingTask) +{ + AsyncLoaderTest t; + + auto job_func = [&] (const LoadJobPtr &) {}; + + auto job1 = makeLoadJob({}, "job1", job_func); + auto job2 = makeLoadJob({ job1 }, "job2", job_func); + auto task = t.schedule({ job1, job2 }); + + task->remove(); // this cancels both jobs (async loader was not started to execute it) + + ASSERT_EQ(job1->status(), LoadStatus::CANCELED); + ASSERT_EQ(job2->status(), LoadStatus::CANCELED); + + try + { + job1->wait(); + FAIL(); + } + catch (Exception & e) + { + ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED); + } + + try + { + job2->wait(); + FAIL(); + } + catch (Exception & e) + { + ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED); + } +} + +TEST(AsyncLoader, CancelPendingDependency) +{ + AsyncLoaderTest t; + + auto job_func = [&] (const LoadJobPtr &) {}; + + auto job1 = makeLoadJob({}, "job1", job_func); + auto job2 = makeLoadJob({ job1 }, "job2", job_func); + auto task1 = t.schedule({ job1 }); + auto task2 = t.schedule({ job2 }); + + task1->remove(); // this cancels both jobs, due to dependency (async loader was not started to execute it) + + ASSERT_EQ(job1->status(), LoadStatus::CANCELED); + ASSERT_EQ(job2->status(), LoadStatus::CANCELED); + + try + { + job1->wait(); + FAIL(); + } + catch (Exception & e) + { + ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED); + } + + try + { + job2->wait(); + FAIL(); + } + catch (Exception & e) + { + ASSERT_TRUE(e.code() == ErrorCodes::ASYNC_LOAD_CANCELED); + } +} + +TEST(AsyncLoader, CancelExecutingJob) +{ + AsyncLoaderTest t; + t.loader.start(); + + std::barrier sync(2); + + auto job_func = [&] (const LoadJobPtr &) + { + sync.arrive_and_wait(); // (A) sync with main thread + sync.arrive_and_wait(); // (B) wait for waiter + // signals (C) + }; + + auto job = makeLoadJob({}, "job", job_func); + auto task = t.schedule({ job }); + + sync.arrive_and_wait(); // (A) wait for job to start executing + std::thread canceler([&] + { + task->remove(); // waits for (C) + }); + while (job->waitersCount() == 0) + std::this_thread::yield(); + ASSERT_EQ(job->status(), LoadStatus::PENDING); + sync.arrive_and_wait(); // (B) sync with job + canceler.join(); + + ASSERT_EQ(job->status(), LoadStatus::OK); + job->wait(); +} + +TEST(AsyncLoader, CancelExecutingTask) +{ + AsyncLoaderTest t(16); + t.loader.start(); + std::barrier sync(2); + + auto blocker_job_func = [&] (const LoadJobPtr &) + { + sync.arrive_and_wait(); // (A) sync with main thread + sync.arrive_and_wait(); // (B) wait for waiter + // signals (C) + }; + + auto job_to_cancel_func = [&] (const LoadJobPtr &) + { + FAIL(); // this job should be canceled + }; + + auto job_to_succeed_func = [&] (const LoadJobPtr &) + { + }; + + // Make several iterations to catch the race (if any) + for (int iteration = 0; iteration < 10; iteration++) { + std::vector task1_jobs; + task1_jobs.reserve(256); + auto blocker_job = makeLoadJob({}, "blocker_job", blocker_job_func); + task1_jobs.push_back(blocker_job); + for (int i = 0; i < 100; i++) + task1_jobs.push_back(makeLoadJob({ blocker_job }, "job_to_cancel", job_to_cancel_func)); + auto task1 = t.schedule({ task1_jobs.begin(), task1_jobs.end() }); + auto job_to_succeed = makeLoadJob({ blocker_job }, "job_to_succeed", job_to_succeed_func); + auto task2 = t.schedule({ job_to_succeed }); + + sync.arrive_and_wait(); // (A) wait for job to start executing + std::thread canceler([&] + { + task1->remove(); // waits for (C) + }); + while (blocker_job->waitersCount() == 0) + std::this_thread::yield(); + ASSERT_EQ(blocker_job->status(), LoadStatus::PENDING); + sync.arrive_and_wait(); // (B) sync with job + canceler.join(); + t.loader.wait(); + + ASSERT_EQ(blocker_job->status(), LoadStatus::OK); + ASSERT_EQ(job_to_succeed->status(), LoadStatus::OK); + for (const auto & job : task1_jobs) + { + if (job != blocker_job) + ASSERT_EQ(job->status(), LoadStatus::CANCELED); + } + } +} + +TEST(AsyncLoader, DISABLED_JobFailure) +{ + AsyncLoaderTest t; + t.loader.start(); + + std::string error_message = "test job failure"; + + auto job_func = [&] (const LoadJobPtr &) { + throw std::runtime_error(error_message); + }; + + auto job = makeLoadJob({}, "job", job_func); + auto task = t.schedule({ job }); + + t.loader.wait(); + + ASSERT_EQ(job->status(), LoadStatus::FAILED); + try + { + job->wait(); + FAIL(); + } + catch (Exception & e) + { + ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_FAILED); + ASSERT_TRUE(e.message().find(error_message) != String::npos); + } +} + +TEST(AsyncLoader, ScheduleJobWithFailedDependencies) +{ + AsyncLoaderTest t; + t.loader.start(); + + std::string_view error_message = "test job failure"; + + auto failed_job_func = [&] (const LoadJobPtr &) { + throw Exception(ErrorCodes::ASYNC_LOAD_FAILED, "{}", error_message); + }; + + auto failed_job = makeLoadJob({}, "failed_job", failed_job_func); + auto failed_task = t.schedule({ failed_job }); + + t.loader.wait(); + + auto job_func = [&] (const LoadJobPtr &) {}; + + auto job1 = makeLoadJob({ failed_job }, "job1", job_func); + auto job2 = makeLoadJob({ job1 }, "job2", job_func); + auto task = t.schedule({ job1, job2 }); + + t.loader.wait(); + + ASSERT_EQ(job1->status(), LoadStatus::CANCELED); + ASSERT_EQ(job2->status(), LoadStatus::CANCELED); + try + { + job1->wait(); + FAIL(); + } + catch (Exception & e) + { + ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED); + ASSERT_TRUE(e.message().find(error_message) != String::npos); + } + try + { + job2->wait(); + FAIL(); + } + catch (Exception & e) + { + ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED); + ASSERT_TRUE(e.message().find(error_message) != String::npos); + } +} + +TEST(AsyncLoader, ScheduleJobWithCanceledDependencies) +{ + AsyncLoaderTest t; + + auto canceled_job_func = [&] (const LoadJobPtr &) {}; + auto canceled_job = makeLoadJob({}, "canceled_job", canceled_job_func); + auto canceled_task = t.schedule({ canceled_job }); + canceled_task->remove(); + + t.loader.start(); + + auto job_func = [&] (const LoadJobPtr &) {}; + auto job1 = makeLoadJob({ canceled_job }, "job1", job_func); + auto job2 = makeLoadJob({ job1 }, "job2", job_func); + auto task = t.schedule({ job1, job2 }); + + t.loader.wait(); + + ASSERT_EQ(job1->status(), LoadStatus::CANCELED); + ASSERT_EQ(job2->status(), LoadStatus::CANCELED); + try + { + job1->wait(); + FAIL(); + } + catch (Exception & e) + { + ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED); + } + try + { + job2->wait(); + FAIL(); + } + catch (Exception & e) + { + ASSERT_EQ(e.code(), ErrorCodes::ASYNC_LOAD_CANCELED); + } +} + +TEST(AsyncLoader, TestConcurrency) +{ + AsyncLoaderTest t(10); + t.loader.start(); + + for (int concurrency = 1; concurrency <= 10; concurrency++) + { + std::barrier sync(concurrency); + + std::atomic executing{0}; + auto job_func = [&] (const LoadJobPtr &) + { + executing++; + ASSERT_LE(executing, concurrency); + sync.arrive_and_wait(); + executing--; + }; + + std::vector tasks; + tasks.reserve(concurrency); + for (int i = 0; i < concurrency; i++) + tasks.push_back(t.schedule(t.chainJobSet(5, job_func))); + t.loader.wait(); + ASSERT_EQ(executing, 0); + } +} + +TEST(AsyncLoader, TestOverload) +{ + AsyncLoaderTest t(3); + t.loader.start(); + + size_t max_threads = t.loader.getMaxThreads(); + std::atomic executing{0}; + + for (int concurrency = 4; concurrency <= 8; concurrency++) + { + auto job_func = [&] (const LoadJobPtr &) + { + executing++; + t.randomSleepUs(100, 200, 100); + ASSERT_LE(executing, max_threads); + executing--; + }; + + t.loader.stop(); + std::vector tasks; + tasks.reserve(concurrency); + for (int i = 0; i < concurrency; i++) + tasks.push_back(t.schedule(t.chainJobSet(5, job_func))); + t.loader.start(); + t.loader.wait(); + ASSERT_EQ(executing, 0); + } +} + +TEST(AsyncLoader, StaticPriorities) +{ + AsyncLoaderTest t(1); + + std::string schedule; + + auto job_func = [&] (const LoadJobPtr & self) + { + schedule += fmt::format("{}{}", self->name, self->priority()); + }; + + std::vector jobs; + jobs.push_back(makeLoadJob({}, 0, "A", job_func)); // 0 + jobs.push_back(makeLoadJob({ jobs[0] }, 3, "B", job_func)); // 1 + jobs.push_back(makeLoadJob({ jobs[0] }, 4, "C", job_func)); // 2 + jobs.push_back(makeLoadJob({ jobs[0] }, 1, "D", job_func)); // 3 + jobs.push_back(makeLoadJob({ jobs[0] }, 2, "E", job_func)); // 4 + jobs.push_back(makeLoadJob({ jobs[3], jobs[4] }, 0, "F", job_func)); // 5 + jobs.push_back(makeLoadJob({ jobs[5] }, 0, "G", job_func)); // 6 + jobs.push_back(makeLoadJob({ jobs[6] }, 9, "H", job_func)); // 7 + auto task = t.schedule({ jobs.begin(), jobs.end() }); + + t.loader.start(); + t.loader.wait(); + + ASSERT_EQ(schedule, "A9E9D9F9G9H9C4B3"); +} + +TEST(AsyncLoader, DynamicPriorities) +{ + AsyncLoaderTest t(1); + + for (bool prioritize : {false, true}) + { + std::string schedule; + + LoadJobPtr job_to_prioritize; + + auto job_func = [&] (const LoadJobPtr & self) + { + if (prioritize && self->name == "C") + t.loader.prioritize(job_to_prioritize, 9); // dynamic prioritization + schedule += fmt::format("{}{}", self->name, self->priority()); + }; + + // Job DAG with initial priorities. During execution of C4, job G0 priority is increased to G9, postponing B3 job executing. + // A0 -+-> B3 + // | + // `-> C4 + // | + // `-> D1 -. + // | +-> F0 --> G0 --> H0 + // `-> E2 -' + std::vector jobs; + jobs.push_back(makeLoadJob({}, 0, "A", job_func)); // 0 + jobs.push_back(makeLoadJob({ jobs[0] }, 3, "B", job_func)); // 1 + jobs.push_back(makeLoadJob({ jobs[0] }, 4, "C", job_func)); // 2 + jobs.push_back(makeLoadJob({ jobs[0] }, 1, "D", job_func)); // 3 + jobs.push_back(makeLoadJob({ jobs[0] }, 2, "E", job_func)); // 4 + jobs.push_back(makeLoadJob({ jobs[3], jobs[4] }, 0, "F", job_func)); // 5 + jobs.push_back(makeLoadJob({ jobs[5] }, 0, "G", job_func)); // 6 + jobs.push_back(makeLoadJob({ jobs[6] }, 0, "H", job_func)); // 7 + auto task = t.schedule({ jobs.begin(), jobs.end() }); + + job_to_prioritize = jobs[6]; + + t.loader.start(); + t.loader.wait(); + t.loader.stop(); + + if (prioritize) + ASSERT_EQ(schedule, "A4C4E9D9F9G9B3H0"); + else + ASSERT_EQ(schedule, "A4C4B3E2D1F0G0H0"); + } +} + +TEST(AsyncLoader, RandomIndependentTasks) +{ + AsyncLoaderTest t(16); + t.loader.start(); + + auto job_func = [&] (const LoadJobPtr & self) + { + for (const auto & dep : self->dependencies) + ASSERT_EQ(dep->status(), LoadStatus::OK); + t.randomSleepUs(100, 500, 5); + }; + + std::vector tasks; + tasks.reserve(512); + for (int i = 0; i < 512; i++) + { + int job_count = t.randomInt(1, 32); + tasks.push_back(t.schedule(t.randomJobSet(job_count, 5, job_func))); + t.randomSleepUs(100, 900, 20); // avg=100us + } +} + +TEST(AsyncLoader, RandomDependentTasks) +{ + AsyncLoaderTest t(16); + t.loader.start(); + + std::mutex mutex; + std::condition_variable cv; + std::vector tasks; + std::vector all_jobs; + + auto job_func = [&] (const LoadJobPtr & self) + { + for (const auto & dep : self->dependencies) + ASSERT_EQ(dep->status(), LoadStatus::OK); + cv.notify_one(); + }; + + std::unique_lock lock{mutex}; + + int tasks_left = 1000; + tasks.reserve(tasks_left); + while (tasks_left-- > 0) + { + cv.wait(lock, [&] { return t.loader.getScheduledJobCount() < 100; }); + + // Add one new task + int job_count = t.randomInt(1, 32); + LoadJobSet jobs = t.randomJobSet(job_count, 5, all_jobs, job_func); + all_jobs.insert(all_jobs.end(), jobs.begin(), jobs.end()); + tasks.push_back(t.schedule(std::move(jobs))); + + // Cancel random old task + if (tasks.size() > 100) + tasks.erase(tasks.begin() + t.randomInt(0, tasks.size() - 1)); + } + + t.loader.wait(); +} + +TEST(AsyncLoader, SetMaxThreads) +{ + AsyncLoaderTest t(1); + + std::atomic sync_index{0}; + std::atomic executing{0}; + int max_threads_values[] = {1, 2, 3, 4, 5, 4, 3, 2, 1, 5, 10, 5, 1, 20, 1}; + std::vector>> syncs; + syncs.reserve(std::size(max_threads_values)); + for (int max_threads : max_threads_values) + syncs.push_back(std::make_unique>(max_threads + 1)); + + + auto job_func = [&] (const LoadJobPtr &) + { + int idx = sync_index; + if (idx < syncs.size()) + { + executing++; + syncs[idx]->arrive_and_wait(); // (A) + executing--; + syncs[idx]->arrive_and_wait(); // (B) + } + }; + + // Generate enough independent jobs + for (int i = 0; i < 1000; i++) + t.schedule({makeLoadJob({}, "job", job_func)})->detach(); + + t.loader.start(); + while (sync_index < syncs.size()) + { + // Wait for `max_threads` jobs to start executing + int idx = sync_index; + while (executing.load() != max_threads_values[idx]) + { + ASSERT_LE(executing, max_threads_values[idx]); + std::this_thread::yield(); + } + + // Allow all jobs to finish + syncs[idx]->arrive_and_wait(); // (A) + sync_index++; + if (sync_index < syncs.size()) + t.loader.setMaxThreads(max_threads_values[sync_index]); + syncs[idx]->arrive_and_wait(); // (B) this sync point is required to allow `executing` value to go back down to zero after we change number of workers + } + t.loader.wait(); +} diff --git a/src/Databases/TablesLoader.cpp b/src/Databases/TablesLoader.cpp index ec0fdd85eec..ea0f2072430 100644 --- a/src/Databases/TablesLoader.cpp +++ b/src/Databases/TablesLoader.cpp @@ -25,18 +25,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -static constexpr size_t PRINT_MESSAGE_EACH_N_OBJECTS = 256; -static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5; - -void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch) -{ - if (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS)) - { - LOG_INFO(log, "{}%", processed * 100.0 / total); - watch.restart(); - } -} - TablesLoader::TablesLoader(ContextMutablePtr global_context_, Databases databases_, LoadingStrictnessLevel strictness_mode_) : global_context(global_context_) , databases(std::move(databases_))