From 78d5a4d88b44cffd828ff0bf8bf8126971f4840d Mon Sep 17 00:00:00 2001 From: serxa Date: Fri, 14 Apr 2023 14:00:54 +0000 Subject: [PATCH] add LoadStatus::CANCELED --- src/Common/AsyncLoader.h | 96 ++++++++++++++----------- src/Common/tests/gtest_async_loader.cpp | 24 +++---- 2 files changed, 67 insertions(+), 53 deletions(-) diff --git a/src/Common/AsyncLoader.h b/src/Common/AsyncLoader.h index 96ea6165d34..6af1b79bec8 100644 --- a/src/Common/AsyncLoader.h +++ b/src/Common/AsyncLoader.h @@ -36,9 +36,10 @@ namespace ErrorCodes enum class LoadStatus { - PENDING, // Load is not finished yet - SUCCESS, // Load was successful - FAILED // Load failed or canceled + PENDING, // Load job is not started yet + OK, // Load job executed and was successful + FAILED, // Load job executed and failed + CANCELED // Load job is not going to be executed due to removal or dependency failure }; class LoadJob : private boost::noncopyable @@ -54,14 +55,14 @@ public: LoadStatus status() const { std::unique_lock lock{mutex}; - return !is_finished ? LoadStatus::PENDING : (exception ? LoadStatus::FAILED : LoadStatus::SUCCESS); + return load_status; } void wait() const { std::unique_lock lock{mutex}; waiters++; - finished.wait(lock, [this] { return is_finished; }); + finished.wait(lock, [this] { return load_status != LoadStatus::PENDING; }); waiters--; if (exception) std::rethrow_exception(exception); @@ -71,7 +72,7 @@ public: { std::unique_lock lock{mutex}; waiters++; - finished.wait(lock, [this] { return is_finished; }); + finished.wait(lock, [this] { return load_status != LoadStatus::PENDING; }); waiters--; } @@ -88,18 +89,27 @@ public: private: friend class AsyncLoader; - void setSuccess() + void ok() { std::unique_lock lock{mutex}; - is_finished = true; + load_status = LoadStatus::OK; if (waiters > 0) finished.notify_all(); } - void setFailure(const std::exception_ptr & ptr) + void failed(const std::exception_ptr & ptr) { std::unique_lock lock{mutex}; - is_finished = true; + load_status = LoadStatus::FAILED; + exception = ptr; + if (waiters > 0) + finished.notify_all(); + } + + void canceled(const std::exception_ptr & ptr) + { + std::unique_lock lock{mutex}; + load_status = LoadStatus::CANCELED; exception = ptr; if (waiters > 0) finished.notify_all(); @@ -112,7 +122,7 @@ private: mutable std::mutex mutex; mutable std::condition_variable finished; mutable size_t waiters = 0; - bool is_finished = false; + LoadStatus load_status{LoadStatus::PENDING}; std::exception_ptr exception; }; @@ -148,6 +158,12 @@ private: UInt64 ready_seqno = 0; // zero means that job is not in ready queue LoadJobSet dependent_jobs; + // Three independent states of a non-finished jobs + bool is_blocked() const { return dependencies_left > 0; } + bool is_ready() const { return dependencies_left == 0 && ready_seqno > 0; } + bool is_executing() const { return dependencies_left == 0 && ready_seqno == 0; } + + // Get key of a ready job ReadyKey key() const { return {priority, ready_seqno}; @@ -318,8 +334,8 @@ public: prioritize(dep, priority, lock); } - // Place jobs w/o dependencies to ready queue - if (info.dependencies_left == 0) + // Enqueue non-blocked jobs (w/o dependencies) to ready queue + if (!info.is_blocked()) enqueue(info, job, lock); } @@ -346,14 +362,22 @@ public: { if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end()) { - if (info->second.dependencies_left > 0) // Job is not ready yet - canceled(job, lock); - else if (info->second.ready_seqno) // Job is enqueued in ready queue + if (info->second.is_executing()) + continue; // Skip executing jobs on the first pass + if (info->second.is_ready()) { ready_queue.erase(info->second.key()); info->second.ready_seqno = 0; - canceled(job, lock); } + std::exception_ptr e; + NOEXCEPT_SCOPE({ + ALLOW_ALLOCATIONS_IN_SCOPE; + e = std::make_exception_ptr( + Exception(ErrorCodes::ASYNC_LOAD_CANCELED, + "Load job '{}' canceled", + job->name)); + }); + markNotOk(job, e, LoadStatus::CANCELED, lock); } } // On the second pass wait for executing jobs to finish @@ -362,8 +386,7 @@ public: if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end()) { // Job is currently executing - chassert(info->second.dependencies_left == 0); - chassert(info->second.ready_seqno == 0); + chassert(info->second.is_executing()); lock.unlock(); job->waitNoThrow(); // Wait for job to finish lock.lock(); @@ -417,23 +440,10 @@ private: return {}; } - void canceled(const LoadJobPtr & job, std::unique_lock & lock) - { - std::exception_ptr e; - NOEXCEPT_SCOPE({ - ALLOW_ALLOCATIONS_IN_SCOPE; - e = std::make_exception_ptr( - Exception(ErrorCodes::ASYNC_LOAD_CANCELED, - "Load job '{}' canceled", - job->name)); - }); - failed(job, e, lock); - } - - void loaded(const LoadJobPtr & job, std::unique_lock & lock) + void markOk(const LoadJobPtr & job, std::unique_lock & lock) { // Notify waiters - job->setSuccess(); + job->ok(); // Update dependent jobs and enqueue if ready chassert(scheduled_jobs.contains(job)); // Job was pending @@ -441,17 +451,21 @@ private: { chassert(scheduled_jobs.contains(dep)); // All depended jobs must be pending Info & dep_info = scheduled_jobs[dep]; - if (--dep_info.dependencies_left == 0) + dep_info.dependencies_left--; + if (!dep_info.is_blocked()) enqueue(dep_info, dep, lock); } finish(job, lock); } - void failed(const LoadJobPtr & job, std::exception_ptr exception_from_job, std::unique_lock & lock) + void markNotOk(const LoadJobPtr & job, std::exception_ptr exception_from_job, LoadStatus status, std::unique_lock & lock) { // Notify waiters - job->setFailure(exception_from_job); + if (status == LoadStatus::FAILED) + job->failed(exception_from_job); + else if (status == LoadStatus::CANCELED) + job->canceled(exception_from_job); // Recurse into all dependent jobs chassert(scheduled_jobs.contains(job)); // Job was pending @@ -469,7 +483,7 @@ private: dep->name, getExceptionMessage(exception_from_job, /* with_stack_trace = */ false))); }); - failed(dep, e, lock); + markNotOk(dep, e, LoadStatus::CANCELED, lock); } // Clean dependency graph edges @@ -518,7 +532,7 @@ private: void enqueue(Info & info, const LoadJobPtr & job, std::unique_lock & lock) { - chassert(info.dependencies_left == 0); + chassert(!info.is_blocked()); chassert(info.ready_seqno == 0); info.ready_seqno = ++last_ready_seqno; NOEXCEPT_SCOPE({ @@ -555,9 +569,9 @@ private: // Handle just executed job if (exception_from_job) - failed(job, exception_from_job, lock); + markNotOk(job, exception_from_job, LoadStatus::FAILED, lock); else if (job) - loaded(job, lock); + markOk(job, lock); if (!is_running) return; diff --git a/src/Common/tests/gtest_async_loader.cpp b/src/Common/tests/gtest_async_loader.cpp index d913f8f6362..d7706311fa4 100644 --- a/src/Common/tests/gtest_async_loader.cpp +++ b/src/Common/tests/gtest_async_loader.cpp @@ -120,8 +120,8 @@ TEST(AsyncLoader, Smoke) waiter_thread.join(); - ASSERT_EQ(job1->status(), LoadStatus::SUCCESS); - ASSERT_EQ(job2->status(), LoadStatus::SUCCESS); + ASSERT_EQ(job1->status(), LoadStatus::OK); + ASSERT_EQ(job2->status(), LoadStatus::OK); } ASSERT_EQ(jobs_done, 5); @@ -185,7 +185,7 @@ TEST(AsyncLoader, CancelPendingJob) task.remove(); // this cancels pending the job (async loader was not started to execute it) - ASSERT_EQ(job->status(), LoadStatus::FAILED); + ASSERT_EQ(job->status(), LoadStatus::CANCELED); try { job->wait(); @@ -209,8 +209,8 @@ TEST(AsyncLoader, CancelPendingTask) task.remove(); // this cancels both jobs (async loader was not started to execute it) - ASSERT_EQ(job1->status(), LoadStatus::FAILED); - ASSERT_EQ(job2->status(), LoadStatus::FAILED); + ASSERT_EQ(job1->status(), LoadStatus::CANCELED); + ASSERT_EQ(job2->status(), LoadStatus::CANCELED); try { @@ -247,8 +247,8 @@ TEST(AsyncLoader, CancelPendingDependency) task1.remove(); // this cancels both jobs, due to dependency (async loader was not started to execute it) - ASSERT_EQ(job1->status(), LoadStatus::FAILED); - ASSERT_EQ(job2->status(), LoadStatus::FAILED); + ASSERT_EQ(job1->status(), LoadStatus::CANCELED); + ASSERT_EQ(job2->status(), LoadStatus::CANCELED); try { @@ -299,7 +299,7 @@ TEST(AsyncLoader, CancelExecutingJob) sync.arrive_and_wait(); // (B) sync with job canceler.join(); - ASSERT_EQ(job->status(), LoadStatus::SUCCESS); + ASSERT_EQ(job->status(), LoadStatus::OK); job->wait(); } @@ -348,12 +348,12 @@ TEST(AsyncLoader, CancelExecutingTask) canceler.join(); t.loader.wait(); - ASSERT_EQ(blocker_job->status(), LoadStatus::SUCCESS); - ASSERT_EQ(job_to_succeed->status(), LoadStatus::SUCCESS); + ASSERT_EQ(blocker_job->status(), LoadStatus::OK); + ASSERT_EQ(job_to_succeed->status(), LoadStatus::OK); for (const auto & job : task1_jobs) { if (job != blocker_job) - ASSERT_EQ(job->status(), LoadStatus::FAILED); + ASSERT_EQ(job->status(), LoadStatus::CANCELED); } } } @@ -420,7 +420,7 @@ TEST(AsyncLoader, RandomTasks) auto job_func = [&] (const LoadJobPtr & self) { for (const auto & dep : self->dependencies) - ASSERT_EQ(dep->status(), LoadStatus::SUCCESS); + ASSERT_EQ(dep->status(), LoadStatus::OK); t.randomSleepUs(100, 500, 5); };