finish dev

This commit is contained in:
taiyang-li 2022-03-01 15:54:23 +08:00
parent b31440c77a
commit f83132bad2
5 changed files with 102 additions and 171 deletions

View File

@ -866,7 +866,7 @@ void ClientBase::onProfileEvents(Block & block)
if (rows == 0)
return;
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
if (getName() == "local" || server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
{
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
const auto & names = typeid_cast<const ColumnString &>(*block.getByName("name").column);

View File

@ -8,7 +8,6 @@
#include <Core/Protocol.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/ProfileEventsExt.h>
namespace DB
@ -64,84 +63,7 @@ void LocalConnection::updateProgress(const Progress & value)
void LocalConnection::getProfileEvents(Block & block)
{
using namespace ProfileEvents;
static const NamesAndTypesList column_names_and_types = {
{"host_name", std::make_shared<DataTypeString>()},
{"current_time", std::make_shared<DataTypeDateTime>()},
{"thread_id", std::make_shared<DataTypeUInt64>()},
{"type", TypeEnum},
{"name", std::make_shared<DataTypeString>()},
{"value", std::make_shared<DataTypeInt64>()},
};
ColumnsWithTypeAndName temp_columns;
for (auto const & name_and_type : column_names_and_types)
temp_columns.emplace_back(name_and_type.type, name_and_type.name);
block = Block(std::move(temp_columns));
MutableColumns columns = block.mutateColumns();
auto thread_group = CurrentThread::getGroup();
auto const current_thread_id = CurrentThread::get().thread_id;
std::vector<ProfileEventsSnapshot> snapshots;
ThreadIdToCountersSnapshot new_snapshots;
ProfileEventsSnapshot group_snapshot;
{
auto stats = thread_group->getProfileEventsCountersAndMemoryForThreads();
snapshots.reserve(stats.size());
for (auto & stat : stats)
{
auto const thread_id = stat.thread_id;
if (thread_id == current_thread_id)
continue;
auto current_time = time(nullptr);
auto previous_snapshot = last_sent_snapshots.find(thread_id);
auto increment =
previous_snapshot != last_sent_snapshots.end()
? CountersIncrement(stat.counters, previous_snapshot->second)
: CountersIncrement(stat.counters);
snapshots.push_back(ProfileEventsSnapshot{
thread_id,
std::move(increment),
stat.memory_usage,
current_time
});
new_snapshots[thread_id] = std::move(stat.counters);
}
group_snapshot.thread_id = 0;
group_snapshot.current_time = time(nullptr);
group_snapshot.memory_usage = thread_group->memory_tracker.get();
auto group_counters = thread_group->performance_counters.getPartiallyAtomicSnapshot();
auto prev_group_snapshot = last_sent_snapshots.find(0);
group_snapshot.counters =
prev_group_snapshot != last_sent_snapshots.end()
? CountersIncrement(group_counters, prev_group_snapshot->second)
: CountersIncrement(group_counters);
new_snapshots[0] = std::move(group_counters);
}
last_sent_snapshots = std::move(new_snapshots);
const String server_display_name = "localhost";
for (auto & snapshot : snapshots)
{
dumpProfileEvents(snapshot, columns, server_display_name);
dumpMemoryTracker(snapshot, columns, server_display_name);
}
dumpProfileEvents(group_snapshot, columns, server_display_name);
dumpMemoryTracker(group_snapshot, columns, server_display_name);
MutableColumns logs_columns;
Block curr_block;
size_t rows = 0;
for (; state->profile_queue->tryPop(curr_block); ++rows)
{
auto curr_columns = curr_block.getColumns();
for (size_t j = 0; j < curr_columns.size(); ++j)
columns[j]->insertRangeFrom(*curr_columns[j], 0, curr_columns[j]->size());
}
ProfileEvents::getProfileEvents("local", state->profile_queue, block, last_sent_snapshots);
}
void LocalConnection::sendQuery(

View File

@ -1,6 +1,7 @@
#include "ProfileEventsExt.h"
#include <Common/typeid_cast.h>
#include <Common/MemoryTracker.h>
#include <Common/CurrentThread.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
@ -46,8 +47,8 @@ void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column,
offsets.push_back(offsets.back() + size);
}
void dumpProfileEvents(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name)
/// Add records about provided non-zero ProfileEvents::Counters.
static void dumpProfileEvents(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name)
{
size_t rows = 0;
auto & name_column = columns[NAME_COLUMN_INDEX];
@ -76,7 +77,7 @@ void dumpProfileEvents(ProfileEventsSnapshot const & snapshot, DB::MutableColumn
}
}
void dumpMemoryTracker(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name)
static void dumpMemoryTracker(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name)
{
{
size_t i = 0;
@ -90,4 +91,91 @@ void dumpMemoryTracker(ProfileEventsSnapshot const & snapshot, DB::MutableColumn
}
}
void getProfileEvents(
const String & server_display_name,
DB::InternalProfileEventsQueuePtr profile_queue,
DB::Block & block,
ThreadIdToCountersSnapshot & last_sent_snapshots)
{
using namespace DB;
static const NamesAndTypesList column_names_and_types = {
{"host_name", std::make_shared<DataTypeString>()},
{"current_time", std::make_shared<DataTypeDateTime>()},
{"thread_id", std::make_shared<DataTypeUInt64>()},
{"type", TypeEnum},
{"name", std::make_shared<DataTypeString>()},
{"value", std::make_shared<DataTypeInt64>()},
};
ColumnsWithTypeAndName temp_columns;
for (auto const & name_and_type : column_names_and_types)
temp_columns.emplace_back(name_and_type.type, name_and_type.name);
block = std::move(temp_columns);
MutableColumns columns = block.mutateColumns();
auto thread_group = CurrentThread::getGroup();
auto const current_thread_id = CurrentThread::get().thread_id;
std::vector<ProfileEventsSnapshot> snapshots;
ThreadIdToCountersSnapshot new_snapshots;
ProfileEventsSnapshot group_snapshot;
{
auto stats = thread_group->getProfileEventsCountersAndMemoryForThreads();
snapshots.reserve(stats.size());
for (auto & stat : stats)
{
auto const thread_id = stat.thread_id;
if (thread_id == current_thread_id)
continue;
auto current_time = time(nullptr);
auto previous_snapshot = last_sent_snapshots.find(thread_id);
auto increment =
previous_snapshot != last_sent_snapshots.end()
? CountersIncrement(stat.counters, previous_snapshot->second)
: CountersIncrement(stat.counters);
snapshots.push_back(ProfileEventsSnapshot{
thread_id,
std::move(increment),
stat.memory_usage,
current_time
});
new_snapshots[thread_id] = std::move(stat.counters);
}
group_snapshot.thread_id = 0;
group_snapshot.current_time = time(nullptr);
group_snapshot.memory_usage = thread_group->memory_tracker.get();
auto group_counters = thread_group->performance_counters.getPartiallyAtomicSnapshot();
auto prev_group_snapshot = last_sent_snapshots.find(0);
group_snapshot.counters =
prev_group_snapshot != last_sent_snapshots.end()
? CountersIncrement(group_counters, prev_group_snapshot->second)
: CountersIncrement(group_counters);
new_snapshots[0] = std::move(group_counters);
}
last_sent_snapshots = std::move(new_snapshots);
for (auto & snapshot : snapshots)
{
dumpProfileEvents(snapshot, columns, server_display_name);
dumpMemoryTracker(snapshot, columns, server_display_name);
}
dumpProfileEvents(group_snapshot, columns, server_display_name);
dumpMemoryTracker(group_snapshot, columns, server_display_name);
Block curr_block;
size_t rows = 0;
for (; profile_queue->tryPop(curr_block); ++rows)
{
auto curr_columns = curr_block.getColumns();
for (size_t j = 0; j < curr_columns.size(); ++j)
columns[j]->insertRangeFrom(*curr_columns[j], 0, curr_columns[j]->size());
}
bool empty = columns[0]->empty();
if (!empty)
block.setColumns(std::move(columns));
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/ProfileEvents.h>
#include <Common/ThreadStatus.h>
#include <DataTypes/DataTypeEnum.h>
#include <Columns/IColumn.h>
@ -23,10 +24,11 @@ using ThreadIdToCountersSnapshot = std::unordered_map<UInt64, Counters::Snapshot
/// Dumps profile events to columns Map(String, UInt64)
void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only = true);
/// Add records about provided non-zero ProfileEvents::Counters.
void dumpProfileEvents(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name);
void dumpMemoryTracker(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name);
void getProfileEvents(
const String & server_display_name,
DB::InternalProfileEventsQueuePtr profile_queue,
DB::Block & block,
ThreadIdToCountersSnapshot & last_sent_snapshots);
/// This is for ProfileEvents packets.
enum Type : int8_t

View File

@ -31,7 +31,6 @@
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/Session.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Server/TCPServer.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
@ -855,93 +854,13 @@ void TCPHandler::sendExtremes(const Block & extremes)
void TCPHandler::sendProfileEvents()
{
using namespace ProfileEvents;
if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
return;
static const NamesAndTypesList column_names_and_types = {
{ "host_name", std::make_shared<DataTypeString>() },
{ "current_time", std::make_shared<DataTypeDateTime>() },
{ "thread_id", std::make_shared<DataTypeUInt64>() },
{ "type", TypeEnum },
{ "name", std::make_shared<DataTypeString>() },
{ "value", std::make_shared<DataTypeInt64>() },
};
ColumnsWithTypeAndName temp_columns;
for (auto const & name_and_type : column_names_and_types)
temp_columns.emplace_back(name_and_type.type, name_and_type.name);
Block block(std::move(temp_columns));
MutableColumns columns = block.mutateColumns();
auto thread_group = CurrentThread::getGroup();
auto const current_thread_id = CurrentThread::get().thread_id;
std::vector<ProfileEventsSnapshot> snapshots;
ThreadIdToCountersSnapshot new_snapshots;
ProfileEventsSnapshot group_snapshot;
Block block;
ProfileEvents::getProfileEvents(server_display_name, state.profile_queue, block, last_sent_snapshots);
if (!!block.rows())
{
auto stats = thread_group->getProfileEventsCountersAndMemoryForThreads();
snapshots.reserve(stats.size());
for (auto & stat : stats)
{
auto const thread_id = stat.thread_id;
if (thread_id == current_thread_id)
continue;
auto current_time = time(nullptr);
auto previous_snapshot = last_sent_snapshots.find(thread_id);
auto increment =
previous_snapshot != last_sent_snapshots.end()
? CountersIncrement(stat.counters, previous_snapshot->second)
: CountersIncrement(stat.counters);
snapshots.push_back(ProfileEventsSnapshot{
thread_id,
std::move(increment),
stat.memory_usage,
current_time
});
new_snapshots[thread_id] = std::move(stat.counters);
}
group_snapshot.thread_id = 0;
group_snapshot.current_time = time(nullptr);
group_snapshot.memory_usage = thread_group->memory_tracker.get();
auto group_counters = thread_group->performance_counters.getPartiallyAtomicSnapshot();
auto prev_group_snapshot = last_sent_snapshots.find(0);
group_snapshot.counters =
prev_group_snapshot != last_sent_snapshots.end()
? CountersIncrement(group_counters, prev_group_snapshot->second)
: CountersIncrement(group_counters);
new_snapshots[0] = std::move(group_counters);
}
last_sent_snapshots = std::move(new_snapshots);
for (auto & snapshot : snapshots)
{
dumpProfileEvents(snapshot, columns, server_display_name);
dumpMemoryTracker(snapshot, columns, server_display_name);
}
dumpProfileEvents(group_snapshot, columns, server_display_name);
dumpMemoryTracker(group_snapshot, columns, server_display_name);
MutableColumns logs_columns;
Block curr_block;
size_t rows = 0;
for (; state.profile_queue->tryPop(curr_block); ++rows)
{
auto curr_columns = curr_block.getColumns();
for (size_t j = 0; j < curr_columns.size(); ++j)
columns[j]->insertRangeFrom(*curr_columns[j], 0, curr_columns[j]->size());
}
bool empty = columns[0]->empty();
if (!empty)
{
block.setColumns(std::move(columns));
initProfileEventsBlockOutput(block);
writeVarUInt(Protocol::Server::ProfileEvents, *out);