Compatibility

This commit is contained in:
Alexey Milovidov 2021-12-29 19:57:40 +03:00
parent 422061ce71
commit c1297d2431

View File

@ -800,56 +800,59 @@ void ClientBase::onProfileEvents(Block & block)
if (rows == 0)
return;
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
const auto & names = typeid_cast<const ColumnString &>(*block.getByName("name").column);
const auto & host_names = typeid_cast<const ColumnString &>(*block.getByName("host_name").column);
const auto & array_values = typeid_cast<const ColumnInt64 &>(*block.getByName("value").column).getData();
const auto * user_time_name = ProfileEvents::getName(ProfileEvents::UserTimeMicroseconds);
const auto * system_time_name = ProfileEvents::getName(ProfileEvents::SystemTimeMicroseconds);
HostToThreadTimesMap thread_times;
for (size_t i = 0; i < rows; ++i)
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
{
auto thread_id = array_thread_id[i];
auto host_name = host_names.getDataAt(i).toString();
if (thread_id != 0)
progress_indication.addThreadIdToList(host_name, thread_id);
auto event_name = names.getDataAt(i);
auto value = array_values[i];
if (event_name == user_time_name)
{
thread_times[host_name][thread_id].user_ms = value;
}
else if (event_name == system_time_name)
{
thread_times[host_name][thread_id].system_ms = value;
}
else if (event_name == MemoryTracker::USAGE_EVENT_NAME)
{
thread_times[host_name][thread_id].memory_usage = value;
}
}
auto elapsed_time = profile_events.watch.elapsedMicroseconds();
progress_indication.updateThreadEventData(thread_times, elapsed_time);
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
const auto & names = typeid_cast<const ColumnString &>(*block.getByName("name").column);
const auto & host_names = typeid_cast<const ColumnString &>(*block.getByName("host_name").column);
const auto & array_values = typeid_cast<const ColumnInt64 &>(*block.getByName("value").column).getData();
if (profile_events.print)
{
if (profile_events.watch.elapsedMilliseconds() >= profile_events.delay_ms)
{
initLogsOutputStream();
progress_indication.clearProgressOutput();
logs_out_stream->writeProfileEvents(block);
logs_out_stream->flush();
const auto * user_time_name = ProfileEvents::getName(ProfileEvents::UserTimeMicroseconds);
const auto * system_time_name = ProfileEvents::getName(ProfileEvents::SystemTimeMicroseconds);
profile_events.last_block = {};
}
else
HostToThreadTimesMap thread_times;
for (size_t i = 0; i < rows; ++i)
{
incrementProfileEventsBlock(profile_events.last_block, block);
auto thread_id = array_thread_id[i];
auto host_name = host_names.getDataAt(i).toString();
if (thread_id != 0)
progress_indication.addThreadIdToList(host_name, thread_id);
auto event_name = names.getDataAt(i);
auto value = array_values[i];
if (event_name == user_time_name)
{
thread_times[host_name][thread_id].user_ms = value;
}
else if (event_name == system_time_name)
{
thread_times[host_name][thread_id].system_ms = value;
}
else if (event_name == MemoryTracker::USAGE_EVENT_NAME)
{
thread_times[host_name][thread_id].memory_usage = value;
}
}
auto elapsed_time = profile_events.watch.elapsedMicroseconds();
progress_indication.updateThreadEventData(thread_times, elapsed_time);
if (profile_events.print)
{
if (profile_events.watch.elapsedMilliseconds() >= profile_events.delay_ms)
{
initLogsOutputStream();
progress_indication.clearProgressOutput();
logs_out_stream->writeProfileEvents(block);
logs_out_stream->flush();
profile_events.last_block = {};
}
else
{
incrementProfileEventsBlock(profile_events.last_block, block);
}
}
profile_events.watch.restart();
}
profile_events.watch.restart();
}