From f83132bad29fb44ad0ab0bb79664f44cb6e6fb85 Mon Sep 17 00:00:00 2001 From: taiyang-li <654010905@qq.com> Date: Tue, 1 Mar 2022 15:54:23 +0800 Subject: [PATCH] finish dev --- src/Client/ClientBase.cpp | 2 +- src/Client/LocalConnection.cpp | 80 +---------------------- src/Interpreters/ProfileEventsExt.cpp | 94 ++++++++++++++++++++++++++- src/Interpreters/ProfileEventsExt.h | 10 +-- src/Server/TCPHandler.cpp | 87 +------------------------ 5 files changed, 102 insertions(+), 171 deletions(-) diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index 7dfa60ad560..b93fef04be4 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -866,7 +866,7 @@ void ClientBase::onProfileEvents(Block & block) if (rows == 0) return; - if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS) + if (getName() == "local" || server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS) { const auto & array_thread_id = typeid_cast(*block.getByName("thread_id").column).getData(); const auto & names = typeid_cast(*block.getByName("name").column); diff --git a/src/Client/LocalConnection.cpp b/src/Client/LocalConnection.cpp index 08ba485d5fc..02b437d7ce6 100644 --- a/src/Client/LocalConnection.cpp +++ b/src/Client/LocalConnection.cpp @@ -8,7 +8,6 @@ #include #include #include -#include namespace DB @@ -64,84 +63,7 @@ void LocalConnection::updateProgress(const Progress & value) void LocalConnection::getProfileEvents(Block & block) { - using namespace ProfileEvents; - - static const NamesAndTypesList column_names_and_types = { - {"host_name", std::make_shared()}, - {"current_time", std::make_shared()}, - {"thread_id", std::make_shared()}, - {"type", TypeEnum}, - {"name", std::make_shared()}, - {"value", std::make_shared()}, - }; - - ColumnsWithTypeAndName temp_columns; - for (auto const & name_and_type : column_names_and_types) - temp_columns.emplace_back(name_and_type.type, name_and_type.name); - - block = Block(std::move(temp_columns)); - MutableColumns columns = block.mutateColumns(); - auto thread_group = CurrentThread::getGroup(); - auto const current_thread_id = CurrentThread::get().thread_id; - std::vector snapshots; - ThreadIdToCountersSnapshot new_snapshots; - ProfileEventsSnapshot group_snapshot; - { - auto stats = thread_group->getProfileEventsCountersAndMemoryForThreads(); - snapshots.reserve(stats.size()); - - for (auto & stat : stats) - { - auto const thread_id = stat.thread_id; - if (thread_id == current_thread_id) - continue; - auto current_time = time(nullptr); - auto previous_snapshot = last_sent_snapshots.find(thread_id); - auto increment = - previous_snapshot != last_sent_snapshots.end() - ? CountersIncrement(stat.counters, previous_snapshot->second) - : CountersIncrement(stat.counters); - snapshots.push_back(ProfileEventsSnapshot{ - thread_id, - std::move(increment), - stat.memory_usage, - current_time - }); - new_snapshots[thread_id] = std::move(stat.counters); - } - - group_snapshot.thread_id = 0; - group_snapshot.current_time = time(nullptr); - group_snapshot.memory_usage = thread_group->memory_tracker.get(); - auto group_counters = thread_group->performance_counters.getPartiallyAtomicSnapshot(); - auto prev_group_snapshot = last_sent_snapshots.find(0); - group_snapshot.counters = - prev_group_snapshot != last_sent_snapshots.end() - ? CountersIncrement(group_counters, prev_group_snapshot->second) - : CountersIncrement(group_counters); - new_snapshots[0] = std::move(group_counters); - } - last_sent_snapshots = std::move(new_snapshots); - - const String server_display_name = "localhost"; - for (auto & snapshot : snapshots) - { - dumpProfileEvents(snapshot, columns, server_display_name); - dumpMemoryTracker(snapshot, columns, server_display_name); - } - dumpProfileEvents(group_snapshot, columns, server_display_name); - dumpMemoryTracker(group_snapshot, columns, server_display_name); - - MutableColumns logs_columns; - Block curr_block; - size_t rows = 0; - - for (; state->profile_queue->tryPop(curr_block); ++rows) - { - auto curr_columns = curr_block.getColumns(); - for (size_t j = 0; j < curr_columns.size(); ++j) - columns[j]->insertRangeFrom(*curr_columns[j], 0, curr_columns[j]->size()); - } + ProfileEvents::getProfileEvents("local", state->profile_queue, block, last_sent_snapshots); } void LocalConnection::sendQuery( diff --git a/src/Interpreters/ProfileEventsExt.cpp b/src/Interpreters/ProfileEventsExt.cpp index 173df507c65..6961d70529e 100644 --- a/src/Interpreters/ProfileEventsExt.cpp +++ b/src/Interpreters/ProfileEventsExt.cpp @@ -1,6 +1,7 @@ #include "ProfileEventsExt.h" #include #include +#include #include #include #include @@ -46,8 +47,8 @@ void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, offsets.push_back(offsets.back() + size); } - -void dumpProfileEvents(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name) +/// Add records about provided non-zero ProfileEvents::Counters. +static void dumpProfileEvents(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name) { size_t rows = 0; auto & name_column = columns[NAME_COLUMN_INDEX]; @@ -76,7 +77,7 @@ void dumpProfileEvents(ProfileEventsSnapshot const & snapshot, DB::MutableColumn } } -void dumpMemoryTracker(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name) +static void dumpMemoryTracker(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name) { { size_t i = 0; @@ -90,4 +91,91 @@ void dumpMemoryTracker(ProfileEventsSnapshot const & snapshot, DB::MutableColumn } } +void getProfileEvents( + const String & server_display_name, + DB::InternalProfileEventsQueuePtr profile_queue, + DB::Block & block, + ThreadIdToCountersSnapshot & last_sent_snapshots) +{ + using namespace DB; + static const NamesAndTypesList column_names_and_types = { + {"host_name", std::make_shared()}, + {"current_time", std::make_shared()}, + {"thread_id", std::make_shared()}, + {"type", TypeEnum}, + {"name", std::make_shared()}, + {"value", std::make_shared()}, + }; + + ColumnsWithTypeAndName temp_columns; + for (auto const & name_and_type : column_names_and_types) + temp_columns.emplace_back(name_and_type.type, name_and_type.name); + + block = std::move(temp_columns); + MutableColumns columns = block.mutateColumns(); + auto thread_group = CurrentThread::getGroup(); + auto const current_thread_id = CurrentThread::get().thread_id; + std::vector snapshots; + ThreadIdToCountersSnapshot new_snapshots; + ProfileEventsSnapshot group_snapshot; + { + auto stats = thread_group->getProfileEventsCountersAndMemoryForThreads(); + snapshots.reserve(stats.size()); + + for (auto & stat : stats) + { + auto const thread_id = stat.thread_id; + if (thread_id == current_thread_id) + continue; + auto current_time = time(nullptr); + auto previous_snapshot = last_sent_snapshots.find(thread_id); + auto increment = + previous_snapshot != last_sent_snapshots.end() + ? CountersIncrement(stat.counters, previous_snapshot->second) + : CountersIncrement(stat.counters); + snapshots.push_back(ProfileEventsSnapshot{ + thread_id, + std::move(increment), + stat.memory_usage, + current_time + }); + new_snapshots[thread_id] = std::move(stat.counters); + } + + group_snapshot.thread_id = 0; + group_snapshot.current_time = time(nullptr); + group_snapshot.memory_usage = thread_group->memory_tracker.get(); + auto group_counters = thread_group->performance_counters.getPartiallyAtomicSnapshot(); + auto prev_group_snapshot = last_sent_snapshots.find(0); + group_snapshot.counters = + prev_group_snapshot != last_sent_snapshots.end() + ? CountersIncrement(group_counters, prev_group_snapshot->second) + : CountersIncrement(group_counters); + new_snapshots[0] = std::move(group_counters); + } + last_sent_snapshots = std::move(new_snapshots); + + for (auto & snapshot : snapshots) + { + dumpProfileEvents(snapshot, columns, server_display_name); + dumpMemoryTracker(snapshot, columns, server_display_name); + } + dumpProfileEvents(group_snapshot, columns, server_display_name); + dumpMemoryTracker(group_snapshot, columns, server_display_name); + + Block curr_block; + size_t rows = 0; + + for (; profile_queue->tryPop(curr_block); ++rows) + { + auto curr_columns = curr_block.getColumns(); + for (size_t j = 0; j < curr_columns.size(); ++j) + columns[j]->insertRangeFrom(*curr_columns[j], 0, curr_columns[j]->size()); + } + + bool empty = columns[0]->empty(); + if (!empty) + block.setColumns(std::move(columns)); +} + } diff --git a/src/Interpreters/ProfileEventsExt.h b/src/Interpreters/ProfileEventsExt.h index ebb6981405f..7d9fc512d15 100644 --- a/src/Interpreters/ProfileEventsExt.h +++ b/src/Interpreters/ProfileEventsExt.h @@ -1,5 +1,6 @@ #pragma once #include +#include #include #include @@ -23,10 +24,11 @@ using ThreadIdToCountersSnapshot = std::unordered_map #include #include -#include #include #include #include @@ -855,93 +854,13 @@ void TCPHandler::sendExtremes(const Block & extremes) void TCPHandler::sendProfileEvents() { - using namespace ProfileEvents; - if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS) return; - static const NamesAndTypesList column_names_and_types = { - { "host_name", std::make_shared() }, - { "current_time", std::make_shared() }, - { "thread_id", std::make_shared() }, - { "type", TypeEnum }, - { "name", std::make_shared() }, - { "value", std::make_shared() }, - }; - - ColumnsWithTypeAndName temp_columns; - for (auto const & name_and_type : column_names_and_types) - temp_columns.emplace_back(name_and_type.type, name_and_type.name); - - Block block(std::move(temp_columns)); - - MutableColumns columns = block.mutateColumns(); - auto thread_group = CurrentThread::getGroup(); - auto const current_thread_id = CurrentThread::get().thread_id; - std::vector snapshots; - ThreadIdToCountersSnapshot new_snapshots; - ProfileEventsSnapshot group_snapshot; + Block block; + ProfileEvents::getProfileEvents(server_display_name, state.profile_queue, block, last_sent_snapshots); + if (!!block.rows()) { - auto stats = thread_group->getProfileEventsCountersAndMemoryForThreads(); - snapshots.reserve(stats.size()); - - for (auto & stat : stats) - { - auto const thread_id = stat.thread_id; - if (thread_id == current_thread_id) - continue; - auto current_time = time(nullptr); - auto previous_snapshot = last_sent_snapshots.find(thread_id); - auto increment = - previous_snapshot != last_sent_snapshots.end() - ? CountersIncrement(stat.counters, previous_snapshot->second) - : CountersIncrement(stat.counters); - snapshots.push_back(ProfileEventsSnapshot{ - thread_id, - std::move(increment), - stat.memory_usage, - current_time - }); - new_snapshots[thread_id] = std::move(stat.counters); - } - - group_snapshot.thread_id = 0; - group_snapshot.current_time = time(nullptr); - group_snapshot.memory_usage = thread_group->memory_tracker.get(); - auto group_counters = thread_group->performance_counters.getPartiallyAtomicSnapshot(); - auto prev_group_snapshot = last_sent_snapshots.find(0); - group_snapshot.counters = - prev_group_snapshot != last_sent_snapshots.end() - ? CountersIncrement(group_counters, prev_group_snapshot->second) - : CountersIncrement(group_counters); - new_snapshots[0] = std::move(group_counters); - } - last_sent_snapshots = std::move(new_snapshots); - - for (auto & snapshot : snapshots) - { - dumpProfileEvents(snapshot, columns, server_display_name); - dumpMemoryTracker(snapshot, columns, server_display_name); - } - dumpProfileEvents(group_snapshot, columns, server_display_name); - dumpMemoryTracker(group_snapshot, columns, server_display_name); - - MutableColumns logs_columns; - Block curr_block; - size_t rows = 0; - - for (; state.profile_queue->tryPop(curr_block); ++rows) - { - auto curr_columns = curr_block.getColumns(); - for (size_t j = 0; j < curr_columns.size(); ++j) - columns[j]->insertRangeFrom(*curr_columns[j], 0, curr_columns[j]->size()); - } - - bool empty = columns[0]->empty(); - if (!empty) - { - block.setColumns(std::move(columns)); - initProfileEventsBlockOutput(block); writeVarUInt(Protocol::Server::ProfileEvents, *out);