diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index 5e0e11d103e..6a05ebd7c1b 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -671,6 +671,7 @@ void ClientBase::onProfileEvents(Block & block) return; const auto & array_thread_id = typeid_cast(*block.getByName("thread_id").column).getData(); const auto & names = typeid_cast(*block.getByName("name").column); + const auto & host_names = typeid_cast(*block.getByName("host_name").column); const auto & array_values = typeid_cast(*block.getByName("value").column).getData(); auto const * user_time_name = ProfileEvents::getName(ProfileEvents::UserTimeMicroseconds); @@ -679,16 +680,18 @@ void ClientBase::onProfileEvents(Block & block) for (size_t i = 0; i < block.rows(); ++i) { auto thread_id = array_thread_id[i]; - progress_indication.addThreadIdToList(thread_id); + auto host_name = host_names.getDataAt(i).toString(); + if (thread_id != 0) + progress_indication.addThreadIdToList(host_name, thread_id); auto event_name = names.getDataAt(i); auto value = array_values[i]; if (event_name == user_time_name) { - progress_indication.updateThreadUserTime(thread_id, value); + progress_indication.updateThreadUserTime(host_name, thread_id, value); } else if (event_name == system_time_name) { - progress_indication.updateThreadSystemTime(thread_id, value); + progress_indication.updateThreadSystemTime(host_name, thread_id, value); } } } diff --git a/src/Common/ProgressIndication.cpp b/src/Common/ProgressIndication.cpp index ceb039b15f5..b06df1bba15 100644 --- a/src/Common/ProgressIndication.cpp +++ b/src/Common/ProgressIndication.cpp @@ -1,4 +1,5 @@ #include "ProgressIndication.h" +#include #include #include #include @@ -8,6 +9,11 @@ #include +namespace +{ + constexpr UInt64 ZERO = 0; +} + namespace DB { @@ -47,29 +53,53 @@ void ProgressIndication::setFileProgressCallback(ContextMutablePtr context, bool }); } -void ProgressIndication::addThreadIdToList(UInt64 thread_id) +void ProgressIndication::addThreadIdToList(String const & host, UInt64 thread_id) { - if (thread_times.contains(thread_id)) + auto & thread_to_times = thread_times[host]; + if (thread_to_times.contains(thread_id)) return; - thread_times[thread_id] = {}; + thread_to_times[thread_id] = {}; } -void ProgressIndication::updateThreadUserTime(UInt64 thread_id, UInt64 value) +void ProgressIndication::updateThreadUserTime(String const & host, UInt64 thread_id, UInt64 value) { - thread_times[thread_id].user_ms = value; + thread_times[host][thread_id].user_ms = value; } -void ProgressIndication::updateThreadSystemTime(UInt64 thread_id, UInt64 value) +void ProgressIndication::updateThreadSystemTime(String const & host, UInt64 thread_id, UInt64 value) { - thread_times[thread_id].system_ms = value; + thread_times[host][thread_id].system_ms = value; } -UInt64 ProgressIndication::getAccumulatedThreadTime() const +size_t ProgressIndication::getUsedThreadsCount() const { - return std::accumulate(thread_times.cbegin(), thread_times.cend(), static_cast(0), - [](UInt64 acc, auto const & elem) + return std::accumulate(thread_times.cbegin(), thread_times.cend(), 0, + [] (size_t acc, auto const & threads) { - return acc + elem.second.user_ms + elem.second.system_ms; + return acc + threads.second.size(); + }); +} + +UInt64 ProgressIndication::getApproximateCoresNumber() const +{ + return std::accumulate(thread_times.cbegin(), thread_times.cend(), ZERO, + [](UInt64 acc, auto const & threads) + { + auto total_time = std::accumulate(threads.second.cbegin(), threads.second.cend(), ZERO, + [] (UInt64 temp, auto const & elem) + { + if (elem.first == 0) + return temp; + return temp + elem.second.user_ms + elem.second.system_ms; + }); + // Zero thread_id represents thread group which execute query + // (including thread of TCPHandler). + auto const & accumulated_time = threads.second.find(ZERO)->second; + // Performance events of TCPHandler thread are not transmitted, but + // we can calculate it's working time which shows how long the query + // is being processed. + auto io_time = accumulated_time.user_ms + accumulated_time.system_ms - total_time; + return acc + (total_time + io_time - 1) / io_time; }); } @@ -93,9 +123,7 @@ void ProgressIndication::writeFinalProgress() { std::cout << "\nUsed threads to process: " << used_threads; - auto elapsed_ms = watch.elapsedMicroseconds(); - auto accumulated_thread_times = getAccumulatedThreadTime(); - auto approximate_core_number = (accumulated_thread_times + elapsed_ms - 1) / elapsed_ms; + auto approximate_core_number = getApproximateCoresNumber(); if (approximate_core_number != 0) std::cout << " and cores: " << approximate_core_number << "."; else diff --git a/src/Common/ProgressIndication.h b/src/Common/ProgressIndication.h index 7517853f74d..f1d7d214f4f 100644 --- a/src/Common/ProgressIndication.h +++ b/src/Common/ProgressIndication.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -43,17 +44,17 @@ public: /// How much seconds passed since query execution start. double elapsedSeconds() const { return watch.elapsedSeconds(); } - void addThreadIdToList(UInt64 thread_id); + void addThreadIdToList(String const & host, UInt64 thread_id); - void updateThreadUserTime(UInt64 thread_id, UInt64 value); + void updateThreadUserTime(String const & host, UInt64 thread_id, UInt64 value); - void updateThreadSystemTime(UInt64 thread_id, UInt64 value); + void updateThreadSystemTime(String const & host, UInt64 thread_id, UInt64 value); private: - size_t getUsedThreadsCount() const { return thread_times.size(); } + size_t getUsedThreadsCount() const; - UInt64 getAccumulatedThreadTime() const; + UInt64 getApproximateCoresNumber() const; /// This flag controls whether to show the progress bar. We start showing it after /// the query has been executing for 0.5 seconds, and is still less than half complete. @@ -78,7 +79,10 @@ private: UInt64 system_ms = 0; }; - std::unordered_map thread_times; + using ThreadIdToTimeMap = std::unordered_map; + using HostToThreadTimesMap = std::unordered_map; + + HostToThreadTimesMap thread_times; }; } diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 581cba91356..c24bf599527 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -932,19 +932,28 @@ void TCPHandler::sendProfileEvents() MutableColumns columns = block.mutateColumns(); auto thread_group = CurrentThread::getGroup(); + auto const current_thread_id = CurrentThread::get().thread_id; std::vector snapshots; + ProfileEventsSnapshot group_snapshot; { std::lock_guard guard(thread_group->mutex); for (auto * thread : thread_group->threads) { + auto const thread_id = thread->thread_id; + if (thread_id == current_thread_id) + continue; auto current_time = time(nullptr); auto counters = thread->performance_counters.getPartiallyAtomicSnapshot(); auto metric = thread->memory_tracker.getMetric(); - auto const thread_id = CurrentThread::get().thread_id; snapshots.push_back(ProfileEventsSnapshot{thread_id, std::move(counters), metric, current_time}); } + group_snapshot.counters = thread_group->performance_counters.getPartiallyAtomicSnapshot(); + group_snapshot.metric = thread_group->memory_tracker.getMetric(); + group_snapshot.current_time = time(nullptr); } + dumpProfileEvents(group_snapshot.counters, columns, server_display_name, group_snapshot.current_time, 0); + dumpMemoryTracker(group_snapshot.metric, columns, server_display_name, 0); for (auto & snapshot : snapshots) { dumpProfileEvents(