diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index 9e8d8f637b8..dbd0b4e5664 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -15,6 +15,7 @@ #include #include #include +#include namespace Poco @@ -41,7 +42,7 @@ struct ViewRuntimeData; class QueryViewsLog; using InternalTextLogsQueuePtr = std::shared_ptr; using InternalTextLogsQueueWeakPtr = std::weak_ptr; - +using ThreadStatusPtr = ThreadStatus *; /** Thread group is a collection of threads dedicated to single task * (query or other process like background merge). @@ -66,6 +67,7 @@ public: std::function fatal_error_callback; std::vector thread_ids; + std::unordered_set threads; /// The first thread created this thread group UInt64 master_thread_id = 0; diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index 465b8e31b08..81a745ef430 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -123,6 +123,7 @@ void ThreadStatus::setupState(const ThreadGroupStatusPtr & thread_group_) /// NOTE: thread may be attached multiple times if it is reused from a thread pool. thread_group->thread_ids.emplace_back(thread_id); + thread_group->threads.insert(this); logs_queue_ptr = thread_group->logs_queue_ptr; fatal_error_callback = thread_group->fatal_error_callback; diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index fbb5c755142..4c6d01c564c 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -666,6 +666,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors() /// Some time passed and there is a progress. after_send_progress.restart(); sendProgress(); + sendProfileEvents(); } sendLogs(); @@ -691,6 +692,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors() sendProfileInfo(executor.getProfileInfo()); sendProgress(); sendLogs(); + sendProfileEvents(); } if (state.is_connection_closed) @@ -867,12 +869,12 @@ namespace MemoryTracker * memoryTracker, MutableColumns & columns, String const & host_name, - time_t current_time, UInt64 thread_id) { auto metric = memoryTracker->getMetric(); if (metric == CurrentMetrics::end()) return; + time_t current_time = time(nullptr); size_t i = 0; columns[i++]->insertData(host_name.data(), host_name.size()); @@ -890,13 +892,6 @@ namespace void TCPHandler::sendProfileEvents() { - auto thread_group = CurrentThread::getGroup(); - auto const counters_snapshot = CurrentThread::getProfileEvents().getPartiallyAtomicSnapshot(); - auto current_time = time(nullptr); - auto * memory_tracker = CurrentThread::getMemoryTracker(); - - auto const thread_id = CurrentThread::get().thread_id; - auto profile_event_type = std::make_shared( DataTypeEnum8::Values { @@ -920,9 +915,17 @@ void TCPHandler::sendProfileEvents() Block block(std::move(temp_columns)); MutableColumns columns = block.mutateColumns(); - dumpProfileEvents(counters_snapshot, columns, server_display_name, current_time, thread_id); - dumpMemoryTracker(memory_tracker, columns, server_display_name, current_time, thread_id); + auto thread_group = CurrentThread::getGroup(); + for (auto * thread : thread_group->threads) + { + auto const counters_snapshot = thread->performance_counters.getPartiallyAtomicSnapshot(); + auto current_time = time(nullptr); + auto * memory_tracker = &thread->memory_tracker; + auto const thread_id = CurrentThread::get().thread_id; + dumpProfileEvents(counters_snapshot, columns, server_display_name, current_time, thread_id); + dumpMemoryTracker(memory_tracker, columns, server_display_name, thread_id); + } block.setColumns(std::move(columns)); initProfileEventsBlockOutput(block);