diff --git a/src/Common/AsyncLoader.cpp b/src/Common/AsyncLoader.cpp index 19dfce1aa7a..3bec30893b9 100644 --- a/src/Common/AsyncLoader.cpp +++ b/src/Common/AsyncLoader.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -201,9 +202,10 @@ void LoadTask::remove() } } -AsyncLoader::AsyncLoader(std::vector pool_initializers, bool log_failures_, bool log_progress_) +AsyncLoader::AsyncLoader(std::vector pool_initializers, bool log_failures_, bool log_progress_, bool log_events_) : log_failures(log_failures_) , log_progress(log_progress_) + , log_events(log_events_) , log(getLogger("AsyncLoader")) { pools.reserve(pool_initializers.size()); @@ -332,6 +334,8 @@ void AsyncLoader::schedule(const LoadJobSet & jobs_to_schedule) ALLOW_ALLOCATIONS_IN_SCOPE; scheduled_jobs.try_emplace(job); job->scheduled(++last_job_id); + if (log_events) + LOG_DEBUG(log, "Schedule load job '{}' into {}", job->name, getPoolName(job->pool())); }); } @@ -587,6 +591,14 @@ void AsyncLoader::finish(const LoadJobPtr & job, LoadStatus status, std::excepti else if (status == LoadStatus::CANCELED) job->canceled(reason); + if (log_events) + { + NOEXCEPT_SCOPE({ + ALLOW_ALLOCATIONS_IN_SCOPE; + LOG_DEBUG(log, "Finish load job '{}' with status {}", job->name, magic_enum::enum_name(status)); + }); + } + Info & info = scheduled_jobs[job]; if (info.isReady()) { @@ -666,6 +678,14 @@ void AsyncLoader::prioritize(const LoadJobPtr & job, size_t new_pool_id, std::un job->pool_id.store(new_pool_id); + if (log_events) + { + NOEXCEPT_SCOPE({ + ALLOW_ALLOCATIONS_IN_SCOPE; + LOG_DEBUG(log, "Prioritize load job '{}': {} -> {}", job->name, old_pool.name, new_pool.name); + }); + } + // Recurse into dependencies for (const auto & dep : job->dependencies) prioritize(dep, new_pool_id, lock); @@ -770,6 +790,9 @@ void AsyncLoader::wait(std::unique_lock & job_lock, const LoadJobPtr if (job->load_status != LoadStatus::PENDING) // Shortcut just to avoid incrementing ProfileEvents return; + if (log_events) + LOG_DEBUG(log, "Wait load job '{}' in {}", job->name, getPoolName(job->pool_id)); + if (job->on_waiters_increment) job->on_waiters_increment(job); @@ -808,6 +831,20 @@ bool AsyncLoader::canWorkerLive(Pool & pool, std::unique_lock &) && (!current_priority || *current_priority >= pool.priority); } +void AsyncLoader::setCurrentPriority(std::unique_lock &, std::optional priority) +{ + if (log_events && current_priority != priority) + { + NOEXCEPT_SCOPE({ + ALLOW_ALLOCATIONS_IN_SCOPE; + LOG_DEBUG(log, "Change current priority: {} -> {}", + current_priority ? std::to_string(*current_priority) : "none", + priority ? std::to_string(*priority) : "none"); + }); + } + current_priority = priority; +} + void AsyncLoader::updateCurrentPriorityAndSpawn(std::unique_lock & lock) { // Find current priority. @@ -818,7 +855,7 @@ void AsyncLoader::updateCurrentPriorityAndSpawn(std::unique_lock & l if (pool.isActive() && (!priority || *priority > pool.priority)) priority = pool.priority; } - current_priority = priority; + setCurrentPriority(lock, priority); // Spawn workers in all pools with current priority for (Pool & pool : pools) @@ -828,12 +865,14 @@ void AsyncLoader::updateCurrentPriorityAndSpawn(std::unique_lock & l } } -void AsyncLoader::spawn(Pool & pool, std::unique_lock &) +void AsyncLoader::spawn(Pool & pool, std::unique_lock & lock) { + setCurrentPriority(lock, pool.priority); // canSpawnWorker() ensures this would not decrease current_priority pool.workers++; - current_priority = pool.priority; // canSpawnWorker() ensures this would not decrease current_priority NOEXCEPT_SCOPE({ ALLOW_ALLOCATIONS_IN_SCOPE; + if (log_events) + LOG_DEBUG(log, "Spawn loader worker #{} in {}", pool.workers, pool.name); pool.thread_pool->scheduleOrThrowOnError([this, &pool] { worker(pool); }); }); } @@ -861,6 +900,13 @@ void AsyncLoader::worker(Pool & pool) if (!canWorkerLive(pool, lock)) { + if (log_events) + { + NOEXCEPT_SCOPE({ + ALLOW_ALLOCATIONS_IN_SCOPE; + LOG_DEBUG(log, "Stop worker in {}", pool.name); + }); + } if (--pool.workers == 0) updateCurrentPriorityAndSpawn(lock); // It will spawn lower priority workers if needed return; @@ -871,6 +917,14 @@ void AsyncLoader::worker(Pool & pool) job = it->second; pool.ready_queue.erase(it); scheduled_jobs.find(job)->second.ready_seqno = 0; // This job is no longer in the ready queue + + if (log_events) + { + NOEXCEPT_SCOPE({ + ALLOW_ALLOCATIONS_IN_SCOPE; + LOG_DEBUG(log, "Execute load job '{}' in {}", job->name, pool.name); + }); + } } ALLOW_ALLOCATIONS_IN_SCOPE; diff --git a/src/Common/AsyncLoader.h b/src/Common/AsyncLoader.h index 3f81a36aa96..b254c9f2482 100644 --- a/src/Common/AsyncLoader.h +++ b/src/Common/AsyncLoader.h @@ -390,7 +390,7 @@ private: }; public: - AsyncLoader(std::vector pool_initializers, bool log_failures_, bool log_progress_); + AsyncLoader(std::vector pool_initializers, bool log_failures_, bool log_progress_, bool log_events_); // WARNING: all tasks instances should be destructed before associated AsyncLoader. ~AsyncLoader(); @@ -470,6 +470,7 @@ private: void wait(std::unique_lock & job_lock, const LoadJobPtr & job); bool canSpawnWorker(Pool & pool, std::unique_lock & lock); bool canWorkerLive(Pool & pool, std::unique_lock & lock); + void setCurrentPriority(std::unique_lock & lock, std::optional priority); void updateCurrentPriorityAndSpawn(std::unique_lock & lock); void spawn(Pool & pool, std::unique_lock & lock); void worker(Pool & pool); @@ -478,6 +479,7 @@ private: // Logging const bool log_failures; // Worker should log all exceptions caught from job functions. const bool log_progress; // Periodically log total progress + const bool log_events; // Log all important events: job start/end, waits, prioritizations LoggerPtr log; mutable std::mutex mutex; // Guards all the fields below. diff --git a/src/Common/tests/gtest_async_loader.cpp b/src/Common/tests/gtest_async_loader.cpp index 62a27f259cc..174997ddf14 100644 --- a/src/Common/tests/gtest_async_loader.cpp +++ b/src/Common/tests/gtest_async_loader.cpp @@ -50,7 +50,7 @@ struct AsyncLoaderTest pcg64 rng{randomSeed()}; explicit AsyncLoaderTest(std::vector initializers) - : loader(getPoolInitializers(initializers), /* log_failures = */ false, /* log_progress = */ false) + : loader(getPoolInitializers(initializers), /* log_failures = */ false, /* log_progress = */ false, /* log_events = */ false) { loader.stop(); // All tests call `start()` manually to better control ordering } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index c6bc10f3d50..bde5c9c4c1b 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2490,7 +2490,8 @@ AsyncLoader & Context::getAsyncLoader() const } }, /* log_failures = */ true, - /* log_progress = */ true); + /* log_progress = */ true, + /* log_events = */ true); }); return *shared->async_loader;