diff --git a/src/Common/AsyncLoader.cpp b/src/Common/AsyncLoader.cpp index 86edcdc8f3d..4a18bc23e17 100644 --- a/src/Common/AsyncLoader.cpp +++ b/src/Common/AsyncLoader.cpp @@ -107,8 +107,9 @@ void LoadJob::finish() finished.notify_all(); } -void LoadJob::scheduled() +void LoadJob::scheduled(UInt64 job_id_) { + job_id = job_id_; schedule_time = std::chrono::system_clock::now(); } @@ -276,7 +277,7 @@ void AsyncLoader::scheduleImpl(const LoadJobSet & input_jobs) NOEXCEPT_SCOPE({ ALLOW_ALLOCATIONS_IN_SCOPE; scheduled_jobs.try_emplace(job); - job->scheduled(); + job->scheduled(++last_job_id); }); } diff --git a/src/Common/AsyncLoader.h b/src/Common/AsyncLoader.h index 4f4bc20656b..0280237d887 100644 --- a/src/Common/AsyncLoader.h +++ b/src/Common/AsyncLoader.h @@ -50,6 +50,7 @@ public: LoadJob(LoadJobSetType && dependencies_, String name_, size_t pool_id_, Func && func_) : dependencies(std::forward(dependencies_)) , name(std::move(name_)) + , execution_pool_id(pool_id_) , pool_id(pool_id_) , func(std::forward(func_)) {} @@ -79,6 +80,7 @@ public: // Introspection using TimePoint = std::chrono::system_clock::time_point; + UInt64 jobId() const { return job_id; } TimePoint scheduleTime() const { return schedule_time; } TimePoint enqueueTime() const { return enqueue_time; } TimePoint startTime() const { return start_time; } @@ -95,10 +97,11 @@ private: void canceled(const std::exception_ptr & ptr); void finish(); - void scheduled(); + void scheduled(UInt64 job_id_); void enqueued(); void execute(size_t pool, const LoadJobPtr & self); + std::atomic job_id{0}; std::atomic execution_pool_id; std::atomic pool_id; std::function func; @@ -372,6 +375,7 @@ private: bool is_running = true; std::optional current_priority; // highest priority among active pools UInt64 last_ready_seqno = 0; // Increasing counter for ready queue keys. + UInt64 last_job_id = 0; // Increasing counter for job IDs std::unordered_map scheduled_jobs; // Full set of scheduled pending jobs along with scheduling info. std::vector pools; // Thread pools for job execution and ready queues LoadJobSet finished_jobs; // Set of finished jobs (for introspection only, until jobs are removed). diff --git a/src/Storages/System/StorageSystemAsyncLoader.cpp b/src/Storages/System/StorageSystemAsyncLoader.cpp index 32e70d31141..993ecb08a03 100644 --- a/src/Storages/System/StorageSystemAsyncLoader.cpp +++ b/src/Storages/System/StorageSystemAsyncLoader.cpp @@ -49,7 +49,8 @@ NamesAndTypesList StorageSystemAsyncLoader::getNamesAndTypes() { return { { "job", std::make_shared() }, - { "dependencies", std::make_shared(std::make_shared()) }, + { "job_id", std::make_shared() }, + { "dependencies", std::make_shared(std::make_shared()) }, { "dependencies_left", std::make_shared() }, { "status", std::make_shared(getTypeEnumValues()) }, { "is_executing", std::make_shared() }, @@ -77,11 +78,10 @@ void StorageSystemAsyncLoader::fillData(MutableColumns & res_columns, ContextPtr for (const auto & state : async_loader.getJobStates()) { - Array dependencies; dependencies.reserve(state.job->dependencies.size()); for (const auto & dep : state.job->dependencies) - dependencies.emplace_back(dep->name); + dependencies.emplace_back(dep->jobId()); TimePoint started = state.job->startTime(); TimePoint finished = state.job->finishTime(); @@ -112,6 +112,7 @@ void StorageSystemAsyncLoader::fillData(MutableColumns & res_columns, ContextPtr size_t i = 0; res_columns[i++]->insert(state.job->name); + res_columns[i++]->insert(state.job->jobId()); res_columns[i++]->insert(dependencies); res_columns[i++]->insert(state.dependencies_left); res_columns[i++]->insert(static_cast(state.job->status()));