introduce unique job_id in async_loader

This commit is contained in:
serxa 2023-06-03 15:16:21 +00:00
parent 5509749e43
commit 443ec12f66
3 changed files with 12 additions and 6 deletions

View File

@ -107,8 +107,9 @@ void LoadJob::finish()
finished.notify_all();
}
void LoadJob::scheduled()
void LoadJob::scheduled(UInt64 job_id_)
{
job_id = job_id_;
schedule_time = std::chrono::system_clock::now();
}
@ -276,7 +277,7 @@ void AsyncLoader::scheduleImpl(const LoadJobSet & input_jobs)
NOEXCEPT_SCOPE({
ALLOW_ALLOCATIONS_IN_SCOPE;
scheduled_jobs.try_emplace(job);
job->scheduled();
job->scheduled(++last_job_id);
});
}

View File

@ -50,6 +50,7 @@ public:
LoadJob(LoadJobSetType && dependencies_, String name_, size_t pool_id_, Func && func_)
: dependencies(std::forward<LoadJobSetType>(dependencies_))
, name(std::move(name_))
, execution_pool_id(pool_id_)
, pool_id(pool_id_)
, func(std::forward<Func>(func_))
{}
@ -79,6 +80,7 @@ public:
// Introspection
using TimePoint = std::chrono::system_clock::time_point;
UInt64 jobId() const { return job_id; }
TimePoint scheduleTime() const { return schedule_time; }
TimePoint enqueueTime() const { return enqueue_time; }
TimePoint startTime() const { return start_time; }
@ -95,10 +97,11 @@ private:
void canceled(const std::exception_ptr & ptr);
void finish();
void scheduled();
void scheduled(UInt64 job_id_);
void enqueued();
void execute(size_t pool, const LoadJobPtr & self);
std::atomic<UInt64> job_id{0};
std::atomic<size_t> execution_pool_id;
std::atomic<size_t> pool_id;
std::function<void(const LoadJobPtr & self)> func;
@ -372,6 +375,7 @@ private:
bool is_running = true;
std::optional<Priority> current_priority; // highest priority among active pools
UInt64 last_ready_seqno = 0; // Increasing counter for ready queue keys.
UInt64 last_job_id = 0; // Increasing counter for job IDs
std::unordered_map<LoadJobPtr, Info> scheduled_jobs; // Full set of scheduled pending jobs along with scheduling info.
std::vector<Pool> pools; // Thread pools for job execution and ready queues
LoadJobSet finished_jobs; // Set of finished jobs (for introspection only, until jobs are removed).

View File

@ -49,7 +49,8 @@ NamesAndTypesList StorageSystemAsyncLoader::getNamesAndTypes()
{
return {
{ "job", std::make_shared<DataTypeString>() },
{ "dependencies", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()) },
{ "job_id", std::make_shared<DataTypeUInt64>() },
{ "dependencies", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()) },
{ "dependencies_left", std::make_shared<DataTypeUInt64>() },
{ "status", std::make_shared<DataTypeEnum8>(getTypeEnumValues<LoadStatus>()) },
{ "is_executing", std::make_shared<DataTypeUInt8>() },
@ -77,11 +78,10 @@ void StorageSystemAsyncLoader::fillData(MutableColumns & res_columns, ContextPtr
for (const auto & state : async_loader.getJobStates())
{
Array dependencies;
dependencies.reserve(state.job->dependencies.size());
for (const auto & dep : state.job->dependencies)
dependencies.emplace_back(dep->name);
dependencies.emplace_back(dep->jobId());
TimePoint started = state.job->startTime();
TimePoint finished = state.job->finishTime();
@ -112,6 +112,7 @@ void StorageSystemAsyncLoader::fillData(MutableColumns & res_columns, ContextPtr
size_t i = 0;
res_columns[i++]->insert(state.job->name);
res_columns[i++]->insert(state.job->jobId());
res_columns[i++]->insert(dependencies);
res_columns[i++]->insert(state.dependencies_left);
res_columns[i++]->insert(static_cast<Int8>(state.job->status()));