Fix cores approximation

This commit is contained in:
Dmitry Novik 2021-09-15 18:45:43 +03:00
parent 4c6b3c40f2
commit 7e3caf96be
4 changed files with 68 additions and 24 deletions

View File

@ -671,6 +671,7 @@ void ClientBase::onProfileEvents(Block & block)
return;
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
const auto & names = typeid_cast<const ColumnString &>(*block.getByName("name").column);
const auto & host_names = typeid_cast<const ColumnString &>(*block.getByName("host_name").column);
const auto & array_values = typeid_cast<const ColumnUInt64 &>(*block.getByName("value").column).getData();
auto const * user_time_name = ProfileEvents::getName(ProfileEvents::UserTimeMicroseconds);
@ -679,16 +680,18 @@ void ClientBase::onProfileEvents(Block & block)
for (size_t i = 0; i < block.rows(); ++i)
{
auto thread_id = array_thread_id[i];
progress_indication.addThreadIdToList(thread_id);
auto host_name = host_names.getDataAt(i).toString();
if (thread_id != 0)
progress_indication.addThreadIdToList(host_name, thread_id);
auto event_name = names.getDataAt(i);
auto value = array_values[i];
if (event_name == user_time_name)
{
progress_indication.updateThreadUserTime(thread_id, value);
progress_indication.updateThreadUserTime(host_name, thread_id, value);
}
else if (event_name == system_time_name)
{
progress_indication.updateThreadSystemTime(thread_id, value);
progress_indication.updateThreadSystemTime(host_name, thread_id, value);
}
}
}

View File

@ -1,4 +1,5 @@
#include "ProgressIndication.h"
#include <cstddef>
#include <numeric>
#include <cmath>
#include <IO/WriteBufferFromFileDescriptor.h>
@ -8,6 +9,11 @@
#include <Databases/DatabaseMemory.h>
namespace
{
constexpr UInt64 ZERO = 0;
}
namespace DB
{
@ -47,29 +53,53 @@ void ProgressIndication::setFileProgressCallback(ContextMutablePtr context, bool
});
}
void ProgressIndication::addThreadIdToList(UInt64 thread_id)
void ProgressIndication::addThreadIdToList(String const & host, UInt64 thread_id)
{
if (thread_times.contains(thread_id))
auto & thread_to_times = thread_times[host];
if (thread_to_times.contains(thread_id))
return;
thread_times[thread_id] = {};
thread_to_times[thread_id] = {};
}
void ProgressIndication::updateThreadUserTime(UInt64 thread_id, UInt64 value)
void ProgressIndication::updateThreadUserTime(String const & host, UInt64 thread_id, UInt64 value)
{
thread_times[thread_id].user_ms = value;
thread_times[host][thread_id].user_ms = value;
}
void ProgressIndication::updateThreadSystemTime(UInt64 thread_id, UInt64 value)
void ProgressIndication::updateThreadSystemTime(String const & host, UInt64 thread_id, UInt64 value)
{
thread_times[thread_id].system_ms = value;
thread_times[host][thread_id].system_ms = value;
}
UInt64 ProgressIndication::getAccumulatedThreadTime() const
size_t ProgressIndication::getUsedThreadsCount() const
{
return std::accumulate(thread_times.cbegin(), thread_times.cend(), static_cast<UInt64>(0),
[](UInt64 acc, auto const & elem)
return std::accumulate(thread_times.cbegin(), thread_times.cend(), 0,
[] (size_t acc, auto const & threads)
{
return acc + elem.second.user_ms + elem.second.system_ms;
return acc + threads.second.size();
});
}
UInt64 ProgressIndication::getApproximateCoresNumber() const
{
return std::accumulate(thread_times.cbegin(), thread_times.cend(), ZERO,
[](UInt64 acc, auto const & threads)
{
auto total_time = std::accumulate(threads.second.cbegin(), threads.second.cend(), ZERO,
[] (UInt64 temp, auto const & elem)
{
if (elem.first == 0)
return temp;
return temp + elem.second.user_ms + elem.second.system_ms;
});
// Zero thread_id represents thread group which execute query
// (including thread of TCPHandler).
auto const & accumulated_time = threads.second.find(ZERO)->second;
// Performance events of TCPHandler thread are not transmitted, but
// we can calculate it's working time which shows how long the query
// is being processed.
auto io_time = accumulated_time.user_ms + accumulated_time.system_ms - total_time;
return acc + (total_time + io_time - 1) / io_time;
});
}
@ -93,9 +123,7 @@ void ProgressIndication::writeFinalProgress()
{
std::cout << "\nUsed threads to process: " << used_threads;
auto elapsed_ms = watch.elapsedMicroseconds();
auto accumulated_thread_times = getAccumulatedThreadTime();
auto approximate_core_number = (accumulated_thread_times + elapsed_ms - 1) / elapsed_ms;
auto approximate_core_number = getApproximateCoresNumber();
if (approximate_core_number != 0)
std::cout << " and cores: " << approximate_core_number << ".";
else

View File

@ -1,5 +1,6 @@
#pragma once
#include <unordered_map>
#include <unordered_set>
#include <IO/Progress.h>
#include <Interpreters/Context.h>
@ -43,17 +44,17 @@ public:
/// How much seconds passed since query execution start.
double elapsedSeconds() const { return watch.elapsedSeconds(); }
void addThreadIdToList(UInt64 thread_id);
void addThreadIdToList(String const & host, UInt64 thread_id);
void updateThreadUserTime(UInt64 thread_id, UInt64 value);
void updateThreadUserTime(String const & host, UInt64 thread_id, UInt64 value);
void updateThreadSystemTime(UInt64 thread_id, UInt64 value);
void updateThreadSystemTime(String const & host, UInt64 thread_id, UInt64 value);
private:
size_t getUsedThreadsCount() const { return thread_times.size(); }
size_t getUsedThreadsCount() const;
UInt64 getAccumulatedThreadTime() const;
UInt64 getApproximateCoresNumber() const;
/// This flag controls whether to show the progress bar. We start showing it after
/// the query has been executing for 0.5 seconds, and is still less than half complete.
@ -78,7 +79,10 @@ private:
UInt64 system_ms = 0;
};
std::unordered_map<UInt64, ThreadTime> thread_times;
using ThreadIdToTimeMap = std::unordered_map<UInt64, ThreadTime>;
using HostToThreadTimesMap = std::unordered_map<String, ThreadIdToTimeMap>;
HostToThreadTimesMap thread_times;
};
}

View File

@ -932,19 +932,28 @@ void TCPHandler::sendProfileEvents()
MutableColumns columns = block.mutateColumns();
auto thread_group = CurrentThread::getGroup();
auto const current_thread_id = CurrentThread::get().thread_id;
std::vector<ProfileEventsSnapshot> snapshots;
ProfileEventsSnapshot group_snapshot;
{
std::lock_guard guard(thread_group->mutex);
for (auto * thread : thread_group->threads)
{
auto const thread_id = thread->thread_id;
if (thread_id == current_thread_id)
continue;
auto current_time = time(nullptr);
auto counters = thread->performance_counters.getPartiallyAtomicSnapshot();
auto metric = thread->memory_tracker.getMetric();
auto const thread_id = CurrentThread::get().thread_id;
snapshots.push_back(ProfileEventsSnapshot{thread_id, std::move(counters), metric, current_time});
}
group_snapshot.counters = thread_group->performance_counters.getPartiallyAtomicSnapshot();
group_snapshot.metric = thread_group->memory_tracker.getMetric();
group_snapshot.current_time = time(nullptr);
}
dumpProfileEvents(group_snapshot.counters, columns, server_display_name, group_snapshot.current_time, 0);
dumpMemoryTracker(group_snapshot.metric, columns, server_display_name, 0);
for (auto & snapshot : snapshots)
{
dumpProfileEvents(