add LoadStatus::CANCELED

This commit is contained in:
serxa 2023-04-14 14:00:54 +00:00
parent a83fc81ed1
commit 78d5a4d88b
2 changed files with 67 additions and 53 deletions

View File

@ -36,9 +36,10 @@ namespace ErrorCodes
enum class LoadStatus
{
PENDING, // Load is not finished yet
SUCCESS, // Load was successful
FAILED // Load failed or canceled
PENDING, // Load job is not started yet
OK, // Load job executed and was successful
FAILED, // Load job executed and failed
CANCELED // Load job is not going to be executed due to removal or dependency failure
};
class LoadJob : private boost::noncopyable
@ -54,14 +55,14 @@ public:
LoadStatus status() const
{
std::unique_lock lock{mutex};
return !is_finished ? LoadStatus::PENDING : (exception ? LoadStatus::FAILED : LoadStatus::SUCCESS);
return load_status;
}
void wait() const
{
std::unique_lock lock{mutex};
waiters++;
finished.wait(lock, [this] { return is_finished; });
finished.wait(lock, [this] { return load_status != LoadStatus::PENDING; });
waiters--;
if (exception)
std::rethrow_exception(exception);
@ -71,7 +72,7 @@ public:
{
std::unique_lock lock{mutex};
waiters++;
finished.wait(lock, [this] { return is_finished; });
finished.wait(lock, [this] { return load_status != LoadStatus::PENDING; });
waiters--;
}
@ -88,18 +89,27 @@ public:
private:
friend class AsyncLoader;
void setSuccess()
void ok()
{
std::unique_lock lock{mutex};
is_finished = true;
load_status = LoadStatus::OK;
if (waiters > 0)
finished.notify_all();
}
void setFailure(const std::exception_ptr & ptr)
void failed(const std::exception_ptr & ptr)
{
std::unique_lock lock{mutex};
is_finished = true;
load_status = LoadStatus::FAILED;
exception = ptr;
if (waiters > 0)
finished.notify_all();
}
void canceled(const std::exception_ptr & ptr)
{
std::unique_lock lock{mutex};
load_status = LoadStatus::CANCELED;
exception = ptr;
if (waiters > 0)
finished.notify_all();
@ -112,7 +122,7 @@ private:
mutable std::mutex mutex;
mutable std::condition_variable finished;
mutable size_t waiters = 0;
bool is_finished = false;
LoadStatus load_status{LoadStatus::PENDING};
std::exception_ptr exception;
};
@ -148,6 +158,12 @@ private:
UInt64 ready_seqno = 0; // zero means that job is not in ready queue
LoadJobSet dependent_jobs;
// Three independent states of a non-finished jobs
bool is_blocked() const { return dependencies_left > 0; }
bool is_ready() const { return dependencies_left == 0 && ready_seqno > 0; }
bool is_executing() const { return dependencies_left == 0 && ready_seqno == 0; }
// Get key of a ready job
ReadyKey key() const
{
return {priority, ready_seqno};
@ -318,8 +334,8 @@ public:
prioritize(dep, priority, lock);
}
// Place jobs w/o dependencies to ready queue
if (info.dependencies_left == 0)
// Enqueue non-blocked jobs (w/o dependencies) to ready queue
if (!info.is_blocked())
enqueue(info, job, lock);
}
@ -346,14 +362,22 @@ public:
{
if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end())
{
if (info->second.dependencies_left > 0) // Job is not ready yet
canceled(job, lock);
else if (info->second.ready_seqno) // Job is enqueued in ready queue
if (info->second.is_executing())
continue; // Skip executing jobs on the first pass
if (info->second.is_ready())
{
ready_queue.erase(info->second.key());
info->second.ready_seqno = 0;
canceled(job, lock);
}
std::exception_ptr e;
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
e = std::make_exception_ptr(
Exception(ErrorCodes::ASYNC_LOAD_CANCELED,
"Load job '{}' canceled",
job->name));
});
markNotOk(job, e, LoadStatus::CANCELED, lock);
}
}
// On the second pass wait for executing jobs to finish
@ -362,8 +386,7 @@ public:
if (auto info = scheduled_jobs.find(job); info != scheduled_jobs.end())
{
// Job is currently executing
chassert(info->second.dependencies_left == 0);
chassert(info->second.ready_seqno == 0);
chassert(info->second.is_executing());
lock.unlock();
job->waitNoThrow(); // Wait for job to finish
lock.lock();
@ -417,23 +440,10 @@ private:
return {};
}
void canceled(const LoadJobPtr & job, std::unique_lock<std::mutex> & lock)
{
std::exception_ptr e;
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
e = std::make_exception_ptr(
Exception(ErrorCodes::ASYNC_LOAD_CANCELED,
"Load job '{}' canceled",
job->name));
});
failed(job, e, lock);
}
void loaded(const LoadJobPtr & job, std::unique_lock<std::mutex> & lock)
void markOk(const LoadJobPtr & job, std::unique_lock<std::mutex> & lock)
{
// Notify waiters
job->setSuccess();
job->ok();
// Update dependent jobs and enqueue if ready
chassert(scheduled_jobs.contains(job)); // Job was pending
@ -441,17 +451,21 @@ private:
{
chassert(scheduled_jobs.contains(dep)); // All depended jobs must be pending
Info & dep_info = scheduled_jobs[dep];
if (--dep_info.dependencies_left == 0)
dep_info.dependencies_left--;
if (!dep_info.is_blocked())
enqueue(dep_info, dep, lock);
}
finish(job, lock);
}
void failed(const LoadJobPtr & job, std::exception_ptr exception_from_job, std::unique_lock<std::mutex> & lock)
void markNotOk(const LoadJobPtr & job, std::exception_ptr exception_from_job, LoadStatus status, std::unique_lock<std::mutex> & lock)
{
// Notify waiters
job->setFailure(exception_from_job);
if (status == LoadStatus::FAILED)
job->failed(exception_from_job);
else if (status == LoadStatus::CANCELED)
job->canceled(exception_from_job);
// Recurse into all dependent jobs
chassert(scheduled_jobs.contains(job)); // Job was pending
@ -469,7 +483,7 @@ private:
dep->name,
getExceptionMessage(exception_from_job, /* with_stack_trace = */ false)));
});
failed(dep, e, lock);
markNotOk(dep, e, LoadStatus::CANCELED, lock);
}
// Clean dependency graph edges
@ -518,7 +532,7 @@ private:
void enqueue(Info & info, const LoadJobPtr & job, std::unique_lock<std::mutex> & lock)
{
chassert(info.dependencies_left == 0);
chassert(!info.is_blocked());
chassert(info.ready_seqno == 0);
info.ready_seqno = ++last_ready_seqno;
NOEXCEPT_SCOPE({
@ -555,9 +569,9 @@ private:
// Handle just executed job
if (exception_from_job)
failed(job, exception_from_job, lock);
markNotOk(job, exception_from_job, LoadStatus::FAILED, lock);
else if (job)
loaded(job, lock);
markOk(job, lock);
if (!is_running)
return;

View File

@ -120,8 +120,8 @@ TEST(AsyncLoader, Smoke)
waiter_thread.join();
ASSERT_EQ(job1->status(), LoadStatus::SUCCESS);
ASSERT_EQ(job2->status(), LoadStatus::SUCCESS);
ASSERT_EQ(job1->status(), LoadStatus::OK);
ASSERT_EQ(job2->status(), LoadStatus::OK);
}
ASSERT_EQ(jobs_done, 5);
@ -185,7 +185,7 @@ TEST(AsyncLoader, CancelPendingJob)
task.remove(); // this cancels pending the job (async loader was not started to execute it)
ASSERT_EQ(job->status(), LoadStatus::FAILED);
ASSERT_EQ(job->status(), LoadStatus::CANCELED);
try
{
job->wait();
@ -209,8 +209,8 @@ TEST(AsyncLoader, CancelPendingTask)
task.remove(); // this cancels both jobs (async loader was not started to execute it)
ASSERT_EQ(job1->status(), LoadStatus::FAILED);
ASSERT_EQ(job2->status(), LoadStatus::FAILED);
ASSERT_EQ(job1->status(), LoadStatus::CANCELED);
ASSERT_EQ(job2->status(), LoadStatus::CANCELED);
try
{
@ -247,8 +247,8 @@ TEST(AsyncLoader, CancelPendingDependency)
task1.remove(); // this cancels both jobs, due to dependency (async loader was not started to execute it)
ASSERT_EQ(job1->status(), LoadStatus::FAILED);
ASSERT_EQ(job2->status(), LoadStatus::FAILED);
ASSERT_EQ(job1->status(), LoadStatus::CANCELED);
ASSERT_EQ(job2->status(), LoadStatus::CANCELED);
try
{
@ -299,7 +299,7 @@ TEST(AsyncLoader, CancelExecutingJob)
sync.arrive_and_wait(); // (B) sync with job
canceler.join();
ASSERT_EQ(job->status(), LoadStatus::SUCCESS);
ASSERT_EQ(job->status(), LoadStatus::OK);
job->wait();
}
@ -348,12 +348,12 @@ TEST(AsyncLoader, CancelExecutingTask)
canceler.join();
t.loader.wait();
ASSERT_EQ(blocker_job->status(), LoadStatus::SUCCESS);
ASSERT_EQ(job_to_succeed->status(), LoadStatus::SUCCESS);
ASSERT_EQ(blocker_job->status(), LoadStatus::OK);
ASSERT_EQ(job_to_succeed->status(), LoadStatus::OK);
for (const auto & job : task1_jobs)
{
if (job != blocker_job)
ASSERT_EQ(job->status(), LoadStatus::FAILED);
ASSERT_EQ(job->status(), LoadStatus::CANCELED);
}
}
}
@ -420,7 +420,7 @@ TEST(AsyncLoader, RandomTasks)
auto job_func = [&] (const LoadJobPtr & self)
{
for (const auto & dep : self->dependencies)
ASSERT_EQ(dep->status(), LoadStatus::SUCCESS);
ASSERT_EQ(dep->status(), LoadStatus::OK);
t.randomSleepUs(100, 500, 5);
};