fix deadlock in AsyncLoader::stop()

This commit is contained in:
serxa 2024-01-28 15:47:17 +00:00
parent 29d54dab55
commit aa6c7e78be
3 changed files with 43 additions and 18 deletions

View File

@ -2,6 +2,7 @@
#include <limits>
#include <optional>
#include <fmt/format.h>
#include <base/defines.h>
#include <base/scope_guard.h>
#include <Common/ErrorCodes.h>
@ -195,13 +196,6 @@ void LoadTask::remove()
}
}
void LoadTask::detach()
{
jobs.clear();
goal_jobs.clear();
}
AsyncLoader::AsyncLoader(std::vector<PoolInitializer> pool_initializers, bool log_failures_, bool log_progress_)
: log_failures(log_failures_)
, log_progress(log_progress_)
@ -214,7 +208,22 @@ AsyncLoader::AsyncLoader(std::vector<PoolInitializer> pool_initializers, bool lo
AsyncLoader::~AsyncLoader()
{
stop();
// All `LoadTask` objects should be destructed before AsyncLoader destruction because they hold a reference.
// To make sure we check for all pending jobs to be finished.
std::unique_lock lock{mutex};
if (scheduled_jobs.empty() && finished_jobs.empty())
return;
std::vector<String> scheduled;
std::vector<String> finished;
scheduled.reserve(scheduled_jobs.size());
finished.reserve(finished_jobs.size());
for (const auto & [job, _] : scheduled_jobs)
scheduled.push_back(job->name);
for (const auto & job : finished_jobs)
finished.push_back(job->name);
LOG_ERROR(log, "Bug. Destruction with pending ({}) and finished ({}) load jobs.", fmt::join(scheduled, ", "), fmt::join(finished, ", "));
abort();
}
void AsyncLoader::start()
@ -236,6 +245,17 @@ void AsyncLoader::wait()
for (auto & p : pools)
p.thread_pool->wait();
lock.lock();
// If there is no way for all jobs to finish, throw LOGICAL_ERROR instead of deadlock
if (!scheduled_jobs.empty() && !hasWorker(lock))
{
std::vector<String> names;
names.reserve(scheduled_jobs.size());
for (const auto & [job, _] : scheduled_jobs)
names.push_back(job->name);
LOG_ERROR(log, "Waiting for load jobs to finish while being stopped: {}.", fmt::join(names, ", "));
abort();
}
}
}
@ -243,10 +263,12 @@ void AsyncLoader::stop()
{
{
std::unique_lock lock{mutex};
is_running = false;
// NOTE: there is no need to notify because workers never wait
is_running = false; // NOTE: there is no need to notify because workers never wait
}
wait();
// Wait for all currently running jobs to finish (and do NOT wait all pending jobs)
for (auto & p : pools)
p.thread_pool->wait();
}
void AsyncLoader::schedule(LoadTask & task)

View File

@ -198,10 +198,6 @@ public:
// Remove all jobs of this task from AsyncLoader.
void remove();
// Do not track jobs in this task.
// WARNING: Jobs will never be removed() and are going to be stored as finished jobs until ~AsyncLoader().
void detach();
// Return the final jobs in this tasks. This job subset should be used as `dependencies` for dependent jobs or tasks:
// auto load_task = loadSomethingAsync(async_loader, load_after_task.goals(), something);
const LoadJobSet & goals() const { return goal_jobs.empty() ? jobs : goal_jobs; }
@ -333,7 +329,6 @@ private:
public:
AsyncLoader(std::vector<PoolInitializer> pool_initializers, bool log_failures_, bool log_progress_);
// Stops AsyncLoader before destruction
// WARNING: all tasks instances should be destructed before associated AsyncLoader.
~AsyncLoader();

View File

@ -622,7 +622,13 @@ TEST(AsyncLoader, CustomDependencyFailure)
auto dependent_job1 = makeLoadJob({ collect_job }, "dependent_job1", dependent_job_func);
auto dependent_job2 = makeLoadJob({ collect_job }, "dependent_job2", dependent_job_func);
auto dependent_job3 = makeLoadJob({ collect_job }, "dependent_job3", dependent_job_func);
auto task = t.schedule({ dependent_job1, dependent_job2, dependent_job3 }); // Other jobs should be discovery automatically
auto task = t.schedule({
dependent_job1, dependent_job2, dependent_job3,
collect_job,
late_dep1, late_dep2, late_dep3,
good_dep1, good_dep2, good_dep3,
evil_dep1, evil_dep2, evil_dep3,
});
t.loader.wait(collect_job, true);
canceled_sync.arrive_and_wait(); // (A)
@ -1022,8 +1028,10 @@ TEST(AsyncLoader, SetMaxThreads)
};
// Generate enough independent jobs
std::vector<LoadTaskPtr> tasks;
tasks.reserve(1000);
for (int i = 0; i < 1000; i++)
t.schedule({makeLoadJob({}, "job", job_func)})->detach();
tasks.push_back(t.schedule({makeLoadJob({}, "job", job_func)}));
t.loader.start();
while (sync_index < syncs.size())