Send profile events from all threads of current group

This commit is contained in:
Dmitry Novik 2021-08-30 18:35:25 +03:00
parent 362bcb2f66
commit e9b1e05461
3 changed files with 17 additions and 11 deletions

View File

@ -15,6 +15,7 @@
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <unordered_set>
namespace Poco
@ -41,7 +42,7 @@ struct ViewRuntimeData;
class QueryViewsLog;
using InternalTextLogsQueuePtr = std::shared_ptr<InternalTextLogsQueue>;
using InternalTextLogsQueueWeakPtr = std::weak_ptr<InternalTextLogsQueue>;
using ThreadStatusPtr = ThreadStatus *;
/** Thread group is a collection of threads dedicated to single task
* (query or other process like background merge).
@ -66,6 +67,7 @@ public:
std::function<void()> fatal_error_callback;
std::vector<UInt64> thread_ids;
std::unordered_set<ThreadStatusPtr> threads;
/// The first thread created this thread group
UInt64 master_thread_id = 0;

View File

@ -123,6 +123,7 @@ void ThreadStatus::setupState(const ThreadGroupStatusPtr & thread_group_)
/// NOTE: thread may be attached multiple times if it is reused from a thread pool.
thread_group->thread_ids.emplace_back(thread_id);
thread_group->threads.insert(this);
logs_queue_ptr = thread_group->logs_queue_ptr;
fatal_error_callback = thread_group->fatal_error_callback;

View File

@ -666,6 +666,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
/// Some time passed and there is a progress.
after_send_progress.restart();
sendProgress();
sendProfileEvents();
}
sendLogs();
@ -691,6 +692,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
sendProfileInfo(executor.getProfileInfo());
sendProgress();
sendLogs();
sendProfileEvents();
}
if (state.is_connection_closed)
@ -867,12 +869,12 @@ namespace
MemoryTracker * memoryTracker,
MutableColumns & columns,
String const & host_name,
time_t current_time,
UInt64 thread_id)
{
auto metric = memoryTracker->getMetric();
if (metric == CurrentMetrics::end())
return;
time_t current_time = time(nullptr);
size_t i = 0;
columns[i++]->insertData(host_name.data(), host_name.size());
@ -890,13 +892,6 @@ namespace
void TCPHandler::sendProfileEvents()
{
auto thread_group = CurrentThread::getGroup();
auto const counters_snapshot = CurrentThread::getProfileEvents().getPartiallyAtomicSnapshot();
auto current_time = time(nullptr);
auto * memory_tracker = CurrentThread::getMemoryTracker();
auto const thread_id = CurrentThread::get().thread_id;
auto profile_event_type = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
@ -920,9 +915,17 @@ void TCPHandler::sendProfileEvents()
Block block(std::move(temp_columns));
MutableColumns columns = block.mutateColumns();
dumpProfileEvents(counters_snapshot, columns, server_display_name, current_time, thread_id);
dumpMemoryTracker(memory_tracker, columns, server_display_name, current_time, thread_id);
auto thread_group = CurrentThread::getGroup();
for (auto * thread : thread_group->threads)
{
auto const counters_snapshot = thread->performance_counters.getPartiallyAtomicSnapshot();
auto current_time = time(nullptr);
auto * memory_tracker = &thread->memory_tracker;
auto const thread_id = CurrentThread::get().thread_id;
dumpProfileEvents(counters_snapshot, columns, server_display_name, current_time, thread_id);
dumpMemoryTracker(memory_tracker, columns, server_display_name, thread_id);
}
block.setColumns(std::move(columns));
initProfileEventsBlockOutput(block);