Merge pull request #61546 from ClickHouse/more-log-for-async-loader

More logging for loading of tables
This commit is contained in:
Sergei Trifonov 2024-03-25 13:12:45 +01:00 committed by GitHub
commit 10c6152aac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 64 additions and 7 deletions

View File

@ -2,6 +2,7 @@
#include <limits>
#include <optional>
#include <magic_enum.hpp>
#include <fmt/format.h>
#include <base/defines.h>
#include <base/scope_guard.h>
@ -201,9 +202,10 @@ void LoadTask::remove()
}
}
AsyncLoader::AsyncLoader(std::vector<PoolInitializer> pool_initializers, bool log_failures_, bool log_progress_)
AsyncLoader::AsyncLoader(std::vector<PoolInitializer> pool_initializers, bool log_failures_, bool log_progress_, bool log_events_)
: log_failures(log_failures_)
, log_progress(log_progress_)
, log_events(log_events_)
, log(getLogger("AsyncLoader"))
{
pools.reserve(pool_initializers.size());
@ -332,6 +334,8 @@ void AsyncLoader::schedule(const LoadJobSet & jobs_to_schedule)
ALLOW_ALLOCATIONS_IN_SCOPE;
scheduled_jobs.try_emplace(job);
job->scheduled(++last_job_id);
if (log_events)
LOG_DEBUG(log, "Schedule load job '{}' into {}", job->name, getPoolName(job->pool()));
});
}
@ -587,6 +591,14 @@ void AsyncLoader::finish(const LoadJobPtr & job, LoadStatus status, std::excepti
else if (status == LoadStatus::CANCELED)
job->canceled(reason);
if (log_events)
{
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
LOG_DEBUG(log, "Finish load job '{}' with status {}", job->name, magic_enum::enum_name(status));
});
}
Info & info = scheduled_jobs[job];
if (info.isReady())
{
@ -666,6 +678,14 @@ void AsyncLoader::prioritize(const LoadJobPtr & job, size_t new_pool_id, std::un
job->pool_id.store(new_pool_id);
if (log_events)
{
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
LOG_DEBUG(log, "Prioritize load job '{}': {} -> {}", job->name, old_pool.name, new_pool.name);
});
}
// Recurse into dependencies
for (const auto & dep : job->dependencies)
prioritize(dep, new_pool_id, lock);
@ -770,6 +790,9 @@ void AsyncLoader::wait(std::unique_lock<std::mutex> & job_lock, const LoadJobPtr
if (job->load_status != LoadStatus::PENDING) // Shortcut just to avoid incrementing ProfileEvents
return;
if (log_events)
LOG_DEBUG(log, "Wait load job '{}' in {}", job->name, getPoolName(job->pool_id));
if (job->on_waiters_increment)
job->on_waiters_increment(job);
@ -808,6 +831,20 @@ bool AsyncLoader::canWorkerLive(Pool & pool, std::unique_lock<std::mutex> &)
&& (!current_priority || *current_priority >= pool.priority);
}
void AsyncLoader::setCurrentPriority(std::unique_lock<std::mutex> &, std::optional<Priority> priority)
{
if (log_events && current_priority != priority)
{
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
LOG_DEBUG(log, "Change current priority: {} -> {}",
current_priority ? std::to_string(*current_priority) : "none",
priority ? std::to_string(*priority) : "none");
});
}
current_priority = priority;
}
void AsyncLoader::updateCurrentPriorityAndSpawn(std::unique_lock<std::mutex> & lock)
{
// Find current priority.
@ -818,7 +855,7 @@ void AsyncLoader::updateCurrentPriorityAndSpawn(std::unique_lock<std::mutex> & l
if (pool.isActive() && (!priority || *priority > pool.priority))
priority = pool.priority;
}
current_priority = priority;
setCurrentPriority(lock, priority);
// Spawn workers in all pools with current priority
for (Pool & pool : pools)
@ -828,12 +865,14 @@ void AsyncLoader::updateCurrentPriorityAndSpawn(std::unique_lock<std::mutex> & l
}
}
void AsyncLoader::spawn(Pool & pool, std::unique_lock<std::mutex> &)
void AsyncLoader::spawn(Pool & pool, std::unique_lock<std::mutex> & lock)
{
setCurrentPriority(lock, pool.priority); // canSpawnWorker() ensures this would not decrease current_priority
pool.workers++;
current_priority = pool.priority; // canSpawnWorker() ensures this would not decrease current_priority
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
if (log_events)
LOG_DEBUG(log, "Spawn loader worker #{} in {}", pool.workers, pool.name);
pool.thread_pool->scheduleOrThrowOnError([this, &pool] { worker(pool); });
});
}
@ -861,6 +900,13 @@ void AsyncLoader::worker(Pool & pool)
if (!canWorkerLive(pool, lock))
{
if (log_events)
{
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
LOG_DEBUG(log, "Stop worker in {}", pool.name);
});
}
if (--pool.workers == 0)
updateCurrentPriorityAndSpawn(lock); // It will spawn lower priority workers if needed
return;
@ -871,6 +917,14 @@ void AsyncLoader::worker(Pool & pool)
job = it->second;
pool.ready_queue.erase(it);
scheduled_jobs.find(job)->second.ready_seqno = 0; // This job is no longer in the ready queue
if (log_events)
{
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
LOG_DEBUG(log, "Execute load job '{}' in {}", job->name, pool.name);
});
}
}
ALLOW_ALLOCATIONS_IN_SCOPE;

View File

@ -390,7 +390,7 @@ private:
};
public:
AsyncLoader(std::vector<PoolInitializer> pool_initializers, bool log_failures_, bool log_progress_);
AsyncLoader(std::vector<PoolInitializer> pool_initializers, bool log_failures_, bool log_progress_, bool log_events_);
// WARNING: all tasks instances should be destructed before associated AsyncLoader.
~AsyncLoader();
@ -470,6 +470,7 @@ private:
void wait(std::unique_lock<std::mutex> & job_lock, const LoadJobPtr & job);
bool canSpawnWorker(Pool & pool, std::unique_lock<std::mutex> & lock);
bool canWorkerLive(Pool & pool, std::unique_lock<std::mutex> & lock);
void setCurrentPriority(std::unique_lock<std::mutex> & lock, std::optional<Priority> priority);
void updateCurrentPriorityAndSpawn(std::unique_lock<std::mutex> & lock);
void spawn(Pool & pool, std::unique_lock<std::mutex> & lock);
void worker(Pool & pool);
@ -478,6 +479,7 @@ private:
// Logging
const bool log_failures; // Worker should log all exceptions caught from job functions.
const bool log_progress; // Periodically log total progress
const bool log_events; // Log all important events: job start/end, waits, prioritizations
LoggerPtr log;
mutable std::mutex mutex; // Guards all the fields below.

View File

@ -50,7 +50,7 @@ struct AsyncLoaderTest
pcg64 rng{randomSeed()};
explicit AsyncLoaderTest(std::vector<Initializer> initializers)
: loader(getPoolInitializers(initializers), /* log_failures = */ false, /* log_progress = */ false)
: loader(getPoolInitializers(initializers), /* log_failures = */ false, /* log_progress = */ false, /* log_events = */ false)
{
loader.stop(); // All tests call `start()` manually to better control ordering
}

View File

@ -2490,7 +2490,8 @@ AsyncLoader & Context::getAsyncLoader() const
}
},
/* log_failures = */ true,
/* log_progress = */ true);
/* log_progress = */ true,
/* log_events = */ true);
});
return *shared->async_loader;