mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Merge pull request #34605 from bigo-sg/add_metric_for_local
Add cpu/mem metric for clickhouse-local
This commit is contained in:
commit
cdb9a05229
@ -411,7 +411,8 @@ void LocalServer::setupUsers()
|
||||
void LocalServer::connect()
|
||||
{
|
||||
connection_parameters = ConnectionParameters(config());
|
||||
connection = LocalConnection::createConnection(connection_parameters, global_context, need_render_progress);
|
||||
connection = LocalConnection::createConnection(
|
||||
connection_parameters, global_context, need_render_progress, need_render_profile_events, server_display_name);
|
||||
}
|
||||
|
||||
|
||||
|
@ -867,7 +867,7 @@ void ClientBase::onProfileEvents(Block & block)
|
||||
if (rows == 0)
|
||||
return;
|
||||
|
||||
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
|
||||
if (getName() == "local" || server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
|
||||
{
|
||||
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
|
||||
const auto & names = typeid_cast<const ColumnString &>(*block.getByName("name").column);
|
||||
|
@ -219,6 +219,7 @@ protected:
|
||||
|
||||
ProgressIndication progress_indication;
|
||||
bool need_render_progress = true;
|
||||
bool need_render_profile_events = true;
|
||||
bool written_first_block = false;
|
||||
size_t processed_rows = 0; /// How many rows have been read or written.
|
||||
|
||||
|
@ -6,6 +6,8 @@
|
||||
#include <Processors/Executors/PushingAsyncPipelineExecutor.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Core/Protocol.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -18,10 +20,12 @@ namespace ErrorCodes
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_)
|
||||
LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_, bool send_profile_events_, const String & server_display_name_)
|
||||
: WithContext(context_)
|
||||
, session(getContext(), ClientInfo::Interface::LOCAL)
|
||||
, send_progress(send_progress_)
|
||||
, send_profile_events(send_profile_events_)
|
||||
, server_display_name(server_display_name_)
|
||||
{
|
||||
/// Authenticate and create a context to execute queries.
|
||||
session.authenticate("default", "", Poco::Net::SocketAddress{});
|
||||
@ -58,6 +62,11 @@ void LocalConnection::updateProgress(const Progress & value)
|
||||
state->progress.incrementPiecewiseAtomically(value);
|
||||
}
|
||||
|
||||
void LocalConnection::getProfileEvents(Block & block)
|
||||
{
|
||||
ProfileEvents::getProfileEvents(server_display_name, state->profile_queue, block, last_sent_snapshots);
|
||||
}
|
||||
|
||||
void LocalConnection::sendQuery(
|
||||
const ConnectionTimeouts &,
|
||||
const String & query,
|
||||
@ -77,18 +86,23 @@ void LocalConnection::sendQuery(
|
||||
if (!current_database.empty())
|
||||
query_context->setCurrentDatabase(current_database);
|
||||
|
||||
CurrentThread::QueryScope query_scope_holder(query_context);
|
||||
|
||||
state.reset();
|
||||
state.emplace();
|
||||
|
||||
state->query_id = query_id;
|
||||
state->query = query;
|
||||
state->query_scope_holder = std::make_unique<CurrentThread::QueryScope>(query_context);
|
||||
state->stage = QueryProcessingStage::Enum(stage);
|
||||
state->profile_queue = std::make_shared<InternalProfileEventsQueue>(std::numeric_limits<int>::max());
|
||||
CurrentThread::attachInternalProfileEventsQueue(state->profile_queue);
|
||||
|
||||
if (send_progress)
|
||||
state->after_send_progress.restart();
|
||||
|
||||
if (send_profile_events)
|
||||
state->after_send_profile_events.restart();
|
||||
|
||||
next_packet_type.reset();
|
||||
|
||||
try
|
||||
@ -231,6 +245,16 @@ bool LocalConnection::poll(size_t)
|
||||
return true;
|
||||
}
|
||||
|
||||
if (send_profile_events && (state->after_send_profile_events.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay))
|
||||
{
|
||||
Block block;
|
||||
state->after_send_profile_events.restart();
|
||||
next_packet_type = Protocol::Server::ProfileEvents;
|
||||
getProfileEvents(block);
|
||||
state->block.emplace(std::move(block));
|
||||
return true;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
pollImpl();
|
||||
@ -459,9 +483,14 @@ void LocalConnection::sendMergeTreeReadTaskResponse(const PartitionReadResponse
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
|
||||
}
|
||||
|
||||
ServerConnectionPtr LocalConnection::createConnection(const ConnectionParameters &, ContextPtr current_context, bool send_progress)
|
||||
ServerConnectionPtr LocalConnection::createConnection(
|
||||
const ConnectionParameters &,
|
||||
ContextPtr current_context,
|
||||
bool send_progress,
|
||||
bool send_profile_events,
|
||||
const String & server_display_name)
|
||||
{
|
||||
return std::make_unique<LocalConnection>(current_context, send_progress);
|
||||
return std::make_unique<LocalConnection>(current_context, send_progress, send_profile_events, server_display_name);
|
||||
}
|
||||
|
||||
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <QueryPipeline/BlockIO.h>
|
||||
#include <IO/TimeoutSetter.h>
|
||||
#include <Interpreters/Session.h>
|
||||
#include <Interpreters/ProfileEventsExt.h>
|
||||
#include <Storages/ColumnsDescription.h>
|
||||
|
||||
|
||||
@ -29,6 +30,7 @@ struct LocalQueryState
|
||||
std::unique_ptr<PullingAsyncPipelineExecutor> executor;
|
||||
std::unique_ptr<PushingPipelineExecutor> pushing_executor;
|
||||
std::unique_ptr<PushingAsyncPipelineExecutor> pushing_async_executor;
|
||||
InternalProfileEventsQueuePtr profile_queue;
|
||||
|
||||
std::optional<Exception> exception;
|
||||
|
||||
@ -50,19 +52,28 @@ struct LocalQueryState
|
||||
Progress progress;
|
||||
/// Time after the last check to stop the request and send the progress.
|
||||
Stopwatch after_send_progress;
|
||||
Stopwatch after_send_profile_events;
|
||||
|
||||
std::unique_ptr<CurrentThread::QueryScope> query_scope_holder;
|
||||
};
|
||||
|
||||
|
||||
class LocalConnection : public IServerConnection, WithContext
|
||||
{
|
||||
public:
|
||||
explicit LocalConnection(ContextPtr context_, bool send_progress_ = false);
|
||||
explicit LocalConnection(
|
||||
ContextPtr context_, bool send_progress_ = false, bool send_profile_events_ = false, const String & server_display_name_ = "");
|
||||
|
||||
~LocalConnection() override;
|
||||
|
||||
IServerConnection::Type getConnectionType() const override { return IServerConnection::Type::LOCAL; }
|
||||
|
||||
static ServerConnectionPtr createConnection(const ConnectionParameters & connection_parameters, ContextPtr current_context, bool send_progress = false);
|
||||
static ServerConnectionPtr createConnection(
|
||||
const ConnectionParameters & connection_parameters,
|
||||
ContextPtr current_context,
|
||||
bool send_progress = false,
|
||||
bool send_profile_events = false,
|
||||
const String & server_display_name = "");
|
||||
|
||||
void setDefaultDatabase(const String & database) override;
|
||||
|
||||
@ -129,12 +140,16 @@ private:
|
||||
|
||||
void updateProgress(const Progress & value);
|
||||
|
||||
void getProfileEvents(Block & block);
|
||||
|
||||
bool pollImpl();
|
||||
|
||||
ContextMutablePtr query_context;
|
||||
Session session;
|
||||
|
||||
bool send_progress;
|
||||
bool send_profile_events;
|
||||
String server_display_name;
|
||||
String description = "clickhouse-local";
|
||||
|
||||
std::optional<LocalQueryState> state;
|
||||
@ -144,5 +159,7 @@ private:
|
||||
std::optional<UInt64> next_packet_type;
|
||||
|
||||
String current_database;
|
||||
|
||||
ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots;
|
||||
};
|
||||
}
|
||||
|
@ -103,6 +103,7 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p
|
||||
{
|
||||
loading_thread = std::thread([context=Context::createCopy(context), connection_parameters, suggestion_limit, this]
|
||||
{
|
||||
ThreadStatus thread_status;
|
||||
for (size_t retry = 0; retry < 10; ++retry)
|
||||
{
|
||||
try
|
||||
|
@ -1,5 +1,7 @@
|
||||
#include "ProfileEventsExt.h"
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/MemoryTracker.h>
|
||||
#include <Common/CurrentThread.h>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Columns/ColumnString.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
@ -36,7 +38,7 @@ void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column,
|
||||
if (nonzero_only && 0 == value)
|
||||
continue;
|
||||
|
||||
const char * desc = ProfileEvents::getName(event);
|
||||
const char * desc = getName(event);
|
||||
key_column.insertData(desc, strlen(desc));
|
||||
value_column.insert(value);
|
||||
size++;
|
||||
@ -45,4 +47,133 @@ void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column,
|
||||
offsets.push_back(offsets.back() + size);
|
||||
}
|
||||
|
||||
/// Add records about provided non-zero ProfileEvents::Counters.
|
||||
static void dumpProfileEvents(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name)
|
||||
{
|
||||
size_t rows = 0;
|
||||
auto & name_column = columns[NAME_COLUMN_INDEX];
|
||||
auto & value_column = columns[VALUE_COLUMN_INDEX];
|
||||
for (Event event = 0; event < Counters::num_counters; ++event)
|
||||
{
|
||||
Int64 value = snapshot.counters[event];
|
||||
|
||||
if (value == 0)
|
||||
continue;
|
||||
|
||||
const char * desc = getName(event);
|
||||
name_column->insertData(desc, strlen(desc));
|
||||
value_column->insert(value);
|
||||
rows++;
|
||||
}
|
||||
|
||||
// Fill the rest of the columns with data
|
||||
for (size_t row = 0; row < rows; ++row)
|
||||
{
|
||||
size_t i = 0;
|
||||
columns[i++]->insertData(host_name.data(), host_name.size());
|
||||
columns[i++]->insert(UInt64(snapshot.current_time));
|
||||
columns[i++]->insert(UInt64{snapshot.thread_id});
|
||||
columns[i++]->insert(Type::INCREMENT);
|
||||
}
|
||||
}
|
||||
|
||||
static void dumpMemoryTracker(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name)
|
||||
{
|
||||
size_t i = 0;
|
||||
columns[i++]->insertData(host_name.data(), host_name.size());
|
||||
columns[i++]->insert(UInt64(snapshot.current_time));
|
||||
columns[i++]->insert(UInt64{snapshot.thread_id});
|
||||
columns[i++]->insert(Type::GAUGE);
|
||||
|
||||
columns[i++]->insertData(MemoryTracker::USAGE_EVENT_NAME, strlen(MemoryTracker::USAGE_EVENT_NAME));
|
||||
columns[i++]->insert(snapshot.memory_usage);
|
||||
}
|
||||
|
||||
void getProfileEvents(
|
||||
const String & server_display_name,
|
||||
DB::InternalProfileEventsQueuePtr profile_queue,
|
||||
DB::Block & block,
|
||||
ThreadIdToCountersSnapshot & last_sent_snapshots)
|
||||
{
|
||||
using namespace DB;
|
||||
static const NamesAndTypesList column_names_and_types = {
|
||||
{"host_name", std::make_shared<DataTypeString>()},
|
||||
{"current_time", std::make_shared<DataTypeDateTime>()},
|
||||
{"thread_id", std::make_shared<DataTypeUInt64>()},
|
||||
{"type", TypeEnum},
|
||||
{"name", std::make_shared<DataTypeString>()},
|
||||
{"value", std::make_shared<DataTypeInt64>()},
|
||||
};
|
||||
|
||||
ColumnsWithTypeAndName temp_columns;
|
||||
for (auto const & name_and_type : column_names_and_types)
|
||||
temp_columns.emplace_back(name_and_type.type, name_and_type.name);
|
||||
|
||||
block = std::move(temp_columns);
|
||||
MutableColumns columns = block.mutateColumns();
|
||||
auto thread_group = CurrentThread::getGroup();
|
||||
auto const current_thread_id = CurrentThread::get().thread_id;
|
||||
std::vector<ProfileEventsSnapshot> snapshots;
|
||||
ThreadIdToCountersSnapshot new_snapshots;
|
||||
ProfileEventsSnapshot group_snapshot;
|
||||
{
|
||||
auto stats = thread_group->getProfileEventsCountersAndMemoryForThreads();
|
||||
snapshots.reserve(stats.size());
|
||||
|
||||
for (auto & stat : stats)
|
||||
{
|
||||
auto const thread_id = stat.thread_id;
|
||||
if (thread_id == current_thread_id)
|
||||
continue;
|
||||
auto current_time = time(nullptr);
|
||||
auto previous_snapshot = last_sent_snapshots.find(thread_id);
|
||||
auto increment =
|
||||
previous_snapshot != last_sent_snapshots.end()
|
||||
? CountersIncrement(stat.counters, previous_snapshot->second)
|
||||
: CountersIncrement(stat.counters);
|
||||
snapshots.push_back(ProfileEventsSnapshot{
|
||||
thread_id,
|
||||
std::move(increment),
|
||||
stat.memory_usage,
|
||||
current_time
|
||||
});
|
||||
new_snapshots[thread_id] = std::move(stat.counters);
|
||||
}
|
||||
|
||||
group_snapshot.thread_id = 0;
|
||||
group_snapshot.current_time = time(nullptr);
|
||||
group_snapshot.memory_usage = thread_group->memory_tracker.get();
|
||||
auto group_counters = thread_group->performance_counters.getPartiallyAtomicSnapshot();
|
||||
auto prev_group_snapshot = last_sent_snapshots.find(0);
|
||||
group_snapshot.counters =
|
||||
prev_group_snapshot != last_sent_snapshots.end()
|
||||
? CountersIncrement(group_counters, prev_group_snapshot->second)
|
||||
: CountersIncrement(group_counters);
|
||||
new_snapshots[0] = std::move(group_counters);
|
||||
}
|
||||
last_sent_snapshots = std::move(new_snapshots);
|
||||
|
||||
for (auto & snapshot : snapshots)
|
||||
{
|
||||
dumpProfileEvents(snapshot, columns, server_display_name);
|
||||
dumpMemoryTracker(snapshot, columns, server_display_name);
|
||||
}
|
||||
dumpProfileEvents(group_snapshot, columns, server_display_name);
|
||||
dumpMemoryTracker(group_snapshot, columns, server_display_name);
|
||||
|
||||
Block curr_block;
|
||||
size_t rows = 0;
|
||||
|
||||
for (; profile_queue->tryPop(curr_block); ++rows)
|
||||
{
|
||||
auto curr_columns = curr_block.getColumns();
|
||||
for (size_t j = 0; j < curr_columns.size(); ++j)
|
||||
columns[j]->insertRangeFrom(*curr_columns[j], 0, curr_columns[j]->size());
|
||||
}
|
||||
|
||||
bool empty = columns[0]->empty();
|
||||
if (!empty)
|
||||
block.setColumns(std::move(columns));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/ThreadStatus.h>
|
||||
#include <DataTypes/DataTypeEnum.h>
|
||||
#include <Columns/IColumn.h>
|
||||
|
||||
@ -7,9 +8,28 @@
|
||||
namespace ProfileEvents
|
||||
{
|
||||
|
||||
constexpr size_t NAME_COLUMN_INDEX = 4;
|
||||
constexpr size_t VALUE_COLUMN_INDEX = 5;
|
||||
|
||||
struct ProfileEventsSnapshot
|
||||
{
|
||||
UInt64 thread_id;
|
||||
CountersIncrement counters;
|
||||
Int64 memory_usage;
|
||||
time_t current_time;
|
||||
};
|
||||
|
||||
using ThreadIdToCountersSnapshot = std::unordered_map<UInt64, Counters::Snapshot>;
|
||||
|
||||
/// Dumps profile events to columns Map(String, UInt64)
|
||||
void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only = true);
|
||||
|
||||
void getProfileEvents(
|
||||
const String & server_display_name,
|
||||
DB::InternalProfileEventsQueuePtr profile_queue,
|
||||
DB::Block & block,
|
||||
ThreadIdToCountersSnapshot & last_sent_snapshots);
|
||||
|
||||
/// This is for ProfileEvents packets.
|
||||
enum Type : int8_t
|
||||
{
|
||||
|
@ -31,7 +31,6 @@
|
||||
#include <Interpreters/InternalTextLogsQueue.h>
|
||||
#include <Interpreters/OpenTelemetrySpanLog.h>
|
||||
#include <Interpreters/Session.h>
|
||||
#include <Interpreters/ProfileEventsExt.h>
|
||||
#include <Server/TCPServer.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
|
||||
@ -853,163 +852,15 @@ void TCPHandler::sendExtremes(const Block & extremes)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
using namespace ProfileEvents;
|
||||
|
||||
constexpr size_t NAME_COLUMN_INDEX = 4;
|
||||
constexpr size_t VALUE_COLUMN_INDEX = 5;
|
||||
|
||||
struct ProfileEventsSnapshot
|
||||
{
|
||||
UInt64 thread_id;
|
||||
ProfileEvents::CountersIncrement counters;
|
||||
Int64 memory_usage;
|
||||
time_t current_time;
|
||||
};
|
||||
|
||||
/*
|
||||
* Add records about provided non-zero ProfileEvents::Counters.
|
||||
*/
|
||||
void dumpProfileEvents(
|
||||
ProfileEventsSnapshot const & snapshot,
|
||||
MutableColumns & columns,
|
||||
String const & host_name)
|
||||
{
|
||||
size_t rows = 0;
|
||||
auto & name_column = columns[NAME_COLUMN_INDEX];
|
||||
auto & value_column = columns[VALUE_COLUMN_INDEX];
|
||||
for (ProfileEvents::Event event = 0; event < ProfileEvents::Counters::num_counters; ++event)
|
||||
{
|
||||
Int64 value = snapshot.counters[event];
|
||||
|
||||
if (value == 0)
|
||||
continue;
|
||||
|
||||
const char * desc = ProfileEvents::getName(event);
|
||||
name_column->insertData(desc, strlen(desc));
|
||||
value_column->insert(value);
|
||||
rows++;
|
||||
}
|
||||
|
||||
// Fill the rest of the columns with data
|
||||
for (size_t row = 0; row < rows; ++row)
|
||||
{
|
||||
size_t i = 0;
|
||||
columns[i++]->insertData(host_name.data(), host_name.size());
|
||||
columns[i++]->insert(UInt64(snapshot.current_time));
|
||||
columns[i++]->insert(UInt64{snapshot.thread_id});
|
||||
columns[i++]->insert(ProfileEvents::Type::INCREMENT);
|
||||
}
|
||||
}
|
||||
|
||||
void dumpMemoryTracker(
|
||||
ProfileEventsSnapshot const & snapshot,
|
||||
MutableColumns & columns,
|
||||
String const & host_name)
|
||||
{
|
||||
{
|
||||
size_t i = 0;
|
||||
columns[i++]->insertData(host_name.data(), host_name.size());
|
||||
columns[i++]->insert(UInt64(snapshot.current_time));
|
||||
columns[i++]->insert(UInt64{snapshot.thread_id});
|
||||
columns[i++]->insert(ProfileEvents::Type::GAUGE);
|
||||
|
||||
columns[i++]->insertData(MemoryTracker::USAGE_EVENT_NAME, strlen(MemoryTracker::USAGE_EVENT_NAME));
|
||||
columns[i++]->insert(snapshot.memory_usage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void TCPHandler::sendProfileEvents()
|
||||
{
|
||||
if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
|
||||
return;
|
||||
|
||||
NamesAndTypesList column_names_and_types = {
|
||||
{ "host_name", std::make_shared<DataTypeString>() },
|
||||
{ "current_time", std::make_shared<DataTypeDateTime>() },
|
||||
{ "thread_id", std::make_shared<DataTypeUInt64>() },
|
||||
{ "type", ProfileEvents::TypeEnum },
|
||||
{ "name", std::make_shared<DataTypeString>() },
|
||||
{ "value", std::make_shared<DataTypeInt64>() },
|
||||
};
|
||||
|
||||
ColumnsWithTypeAndName temp_columns;
|
||||
for (auto const & name_and_type : column_names_and_types)
|
||||
temp_columns.emplace_back(name_and_type.type, name_and_type.name);
|
||||
|
||||
Block block(std::move(temp_columns));
|
||||
|
||||
MutableColumns columns = block.mutateColumns();
|
||||
auto thread_group = CurrentThread::getGroup();
|
||||
auto const current_thread_id = CurrentThread::get().thread_id;
|
||||
std::vector<ProfileEventsSnapshot> snapshots;
|
||||
ThreadIdToCountersSnapshot new_snapshots;
|
||||
ProfileEventsSnapshot group_snapshot;
|
||||
Block block;
|
||||
ProfileEvents::getProfileEvents(server_display_name, state.profile_queue, block, last_sent_snapshots);
|
||||
if (block.rows() != 0)
|
||||
{
|
||||
auto stats = thread_group->getProfileEventsCountersAndMemoryForThreads();
|
||||
snapshots.reserve(stats.size());
|
||||
|
||||
for (auto & stat : stats)
|
||||
{
|
||||
auto const thread_id = stat.thread_id;
|
||||
if (thread_id == current_thread_id)
|
||||
continue;
|
||||
auto current_time = time(nullptr);
|
||||
auto previous_snapshot = last_sent_snapshots.find(thread_id);
|
||||
auto increment =
|
||||
previous_snapshot != last_sent_snapshots.end()
|
||||
? CountersIncrement(stat.counters, previous_snapshot->second)
|
||||
: CountersIncrement(stat.counters);
|
||||
snapshots.push_back(ProfileEventsSnapshot{
|
||||
thread_id,
|
||||
std::move(increment),
|
||||
stat.memory_usage,
|
||||
current_time
|
||||
});
|
||||
new_snapshots[thread_id] = std::move(stat.counters);
|
||||
}
|
||||
|
||||
group_snapshot.thread_id = 0;
|
||||
group_snapshot.current_time = time(nullptr);
|
||||
group_snapshot.memory_usage = thread_group->memory_tracker.get();
|
||||
auto group_counters = thread_group->performance_counters.getPartiallyAtomicSnapshot();
|
||||
auto prev_group_snapshot = last_sent_snapshots.find(0);
|
||||
group_snapshot.counters =
|
||||
prev_group_snapshot != last_sent_snapshots.end()
|
||||
? CountersIncrement(group_counters, prev_group_snapshot->second)
|
||||
: CountersIncrement(group_counters);
|
||||
new_snapshots[0] = std::move(group_counters);
|
||||
}
|
||||
last_sent_snapshots = std::move(new_snapshots);
|
||||
|
||||
for (auto & snapshot : snapshots)
|
||||
{
|
||||
dumpProfileEvents(snapshot, columns, server_display_name);
|
||||
dumpMemoryTracker(snapshot, columns, server_display_name);
|
||||
}
|
||||
dumpProfileEvents(group_snapshot, columns, server_display_name);
|
||||
dumpMemoryTracker(group_snapshot, columns, server_display_name);
|
||||
|
||||
MutableColumns logs_columns;
|
||||
Block curr_block;
|
||||
size_t rows = 0;
|
||||
|
||||
for (; state.profile_queue->tryPop(curr_block); ++rows)
|
||||
{
|
||||
auto curr_columns = curr_block.getColumns();
|
||||
for (size_t j = 0; j < curr_columns.size(); ++j)
|
||||
columns[j]->insertRangeFrom(*curr_columns[j], 0, curr_columns[j]->size());
|
||||
}
|
||||
|
||||
bool empty = columns[0]->empty();
|
||||
if (!empty)
|
||||
{
|
||||
block.setColumns(std::move(columns));
|
||||
|
||||
initProfileEventsBlockOutput(block);
|
||||
|
||||
writeVarUInt(Protocol::Server::ProfileEvents, *out);
|
||||
|
@ -3,9 +3,10 @@
|
||||
#include <Poco/Net/TCPServerConnection.h>
|
||||
|
||||
#include <base/getFQDNOrHostName.h>
|
||||
#include "Common/ProfileEvents.h"
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/ThreadStatus.h>
|
||||
#include <Core/Protocol.h>
|
||||
#include <Core/QueryProcessingStage.h>
|
||||
#include <IO/Progress.h>
|
||||
@ -13,7 +14,7 @@
|
||||
#include <QueryPipeline/BlockIO.h>
|
||||
#include <Interpreters/InternalTextLogsQueue.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Formats/NativeReader.h>
|
||||
#include <Interpreters/ProfileEventsExt.h>
|
||||
|
||||
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
|
||||
|
||||
@ -36,6 +37,8 @@ struct Settings;
|
||||
class ColumnsDescription;
|
||||
struct ProfileInfo;
|
||||
class TCPServer;
|
||||
class NativeWriter;
|
||||
class NativeReader;
|
||||
|
||||
/// State of query processing.
|
||||
struct QueryState
|
||||
@ -189,9 +192,7 @@ private:
|
||||
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::TCPConnection};
|
||||
|
||||
using ThreadIdToCountersSnapshot = std::unordered_map<UInt64, ProfileEvents::Counters::Snapshot>;
|
||||
|
||||
ThreadIdToCountersSnapshot last_sent_snapshots;
|
||||
ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots;
|
||||
|
||||
/// It is the name of the server that will be sent to the client.
|
||||
String server_display_name;
|
||||
|
@ -74,6 +74,8 @@ EXTERN_TYPES_EXCLUDES=(
|
||||
ProfileEvents::Type
|
||||
ProfileEvents::TypeEnum
|
||||
ProfileEvents::dumpToMapColumn
|
||||
ProfileEvents::getProfileEvents
|
||||
ProfileEvents::ThreadIdToCountersSnapshot
|
||||
ProfileEvents::LOCAL_NAME
|
||||
ProfileEvents::CountersIncrement
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user