fix system.async_loader

This commit is contained in:
serxa 2023-05-31 18:08:03 +00:00
parent 303e3f00c9
commit d364cd067f

View File

@ -56,8 +56,9 @@ NamesAndTypesList StorageSystemAsyncLoader::getNamesAndTypes()
{ "is_blocked", std::make_shared<DataTypeUInt8>() },
{ "is_ready", std::make_shared<DataTypeUInt8>() },
{ "elapsed", std::make_shared<DataTypeFloat64>()},
{ "pool_id", std::make_shared<DataTypeInt64>() },
{ "pool", std::make_shared<DataTypeString>() },
{ "priority", std::make_shared<DataTypeInt64>() },
{ "initial_priority", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()) },
{ "ready_seqno", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>()) },
{ "waiters", std::make_shared<DataTypeUInt64>() },
{ "exception", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()) },
@ -72,7 +73,9 @@ void StorageSystemAsyncLoader::fillData(MutableColumns & res_columns, ContextPtr
{
TimePoint now = std::chrono::system_clock::now();
for (const auto & state : context->getAsyncLoader().getJobStates())
AsyncLoader & async_loader = context->getAsyncLoader();
for (const auto & state : async_loader.getJobStates())
{
Array dependencies;
@ -86,13 +89,9 @@ void StorageSystemAsyncLoader::fillData(MutableColumns & res_columns, ContextPtr
TimeDuration elapsed = started != TimePoint{} ? last - started : TimeDuration{0};
double elapsed_sec = std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed).count() * 1e-9;
Field initial_priority;
if (state.initial_priority)
initial_priority = *state.initial_priority;
Field ready_seqno;
if (state.ready_seqno)
ready_seqno = *state.ready_seqno;
ready_seqno = state.ready_seqno;
Field exception;
if (state.job->exception())
@ -120,8 +119,9 @@ void StorageSystemAsyncLoader::fillData(MutableColumns & res_columns, ContextPtr
res_columns[i++]->insert(state.is_blocked);
res_columns[i++]->insert(state.is_ready);
res_columns[i++]->insert(elapsed_sec);
res_columns[i++]->insert(state.job->priority());
res_columns[i++]->insert(initial_priority);
res_columns[i++]->insert(state.job->pool());
res_columns[i++]->insert(async_loader.getPoolName(state.job->pool()));
res_columns[i++]->insert(async_loader.getPoolPriority(state.job->pool()).value);
res_columns[i++]->insert(ready_seqno);
res_columns[i++]->insert(state.job->waitersCount());
res_columns[i++]->insert(exception);