add metrics for clickhouse-local

This commit is contained in:
taiyang-li 2022-02-15 16:25:07 +08:00
parent bc206ec423
commit b4440131d0
6 changed files with 180 additions and 78 deletions

View File

@ -6,6 +6,9 @@
#include <Processors/Executors/PushingAsyncPipelineExecutor.h>
#include <Storages/IStorage.h>
#include <Core/Protocol.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/ProfileEventsExt.h>
namespace DB
@ -18,10 +21,11 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_)
LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_, bool send_profile_events_)
: WithContext(context_)
, session(getContext(), ClientInfo::Interface::LOCAL)
, send_progress(send_progress_)
, send_profile_events(send_profile_events_)
{
/// Authenticate and create a context to execute queries.
session.authenticate("default", "", Poco::Net::SocketAddress{});
@ -58,6 +62,88 @@ void LocalConnection::updateProgress(const Progress & value)
state->progress.incrementPiecewiseAtomically(value);
}
void LocalConnection::updateProfileEvents(Block & block)
{
static const NamesAndTypesList column_names_and_types = {
{"host_name", std::make_shared<DataTypeString>()},
{"current_time", std::make_shared<DataTypeDateTime>()},
{"thread_id", std::make_shared<DataTypeUInt64>()},
{"type", ProfileEvents::TypeEnum},
{"name", std::make_shared<DataTypeString>()},
{"value", std::make_shared<DataTypeInt64>()},
};
ColumnsWithTypeAndName temp_columns;
for (auto const & name_and_type : column_names_and_types)
temp_columns.emplace_back(name_and_type.type, name_and_type.name);
using namespace ProfileEvents;
block = Block(std::move(temp_columns));
MutableColumns columns = block.mutateColumns();
auto thread_group = CurrentThread::getGroup();
auto const current_thread_id = CurrentThread::get().thread_id;
std::vector<ProfileEventsSnapshot> snapshots;
ThreadIdToCountersSnapshot new_snapshots;
ProfileEventsSnapshot group_snapshot;
{
auto stats = thread_group->getProfileEventsCountersAndMemoryForThreads();
snapshots.reserve(stats.size());
for (auto & stat : stats)
{
auto const thread_id = stat.thread_id;
if (thread_id == current_thread_id)
continue;
auto current_time = time(nullptr);
auto previous_snapshot = last_sent_snapshots.find(thread_id);
auto increment =
previous_snapshot != last_sent_snapshots.end()
? CountersIncrement(stat.counters, previous_snapshot->second)
: CountersIncrement(stat.counters);
snapshots.push_back(ProfileEventsSnapshot{
thread_id,
std::move(increment),
stat.memory_usage,
current_time
});
new_snapshots[thread_id] = std::move(stat.counters);
}
group_snapshot.thread_id = 0;
group_snapshot.current_time = time(nullptr);
group_snapshot.memory_usage = thread_group->memory_tracker.get();
auto group_counters = thread_group->performance_counters.getPartiallyAtomicSnapshot();
auto prev_group_snapshot = last_sent_snapshots.find(0);
group_snapshot.counters =
prev_group_snapshot != last_sent_snapshots.end()
? CountersIncrement(group_counters, prev_group_snapshot->second)
: CountersIncrement(group_counters);
new_snapshots[0] = std::move(group_counters);
}
last_sent_snapshots = std::move(new_snapshots);
const String server_display_name = "localhost";
for (auto & snapshot : snapshots)
{
dumpProfileEvents(snapshot, columns, server_display_name);
dumpMemoryTracker(snapshot, columns, server_display_name);
}
dumpProfileEvents(group_snapshot, columns, server_display_name);
dumpMemoryTracker(group_snapshot, columns, server_display_name);
MutableColumns logs_columns;
Block curr_block;
size_t rows = 0;
for (; state->profile_queue->tryPop(curr_block); ++rows)
{
auto curr_columns = curr_block.getColumns();
for (size_t j = 0; j < curr_columns.size(); ++j)
columns[j]->insertRangeFrom(*curr_columns[j], 0, curr_columns[j]->size());
}
}
void LocalConnection::sendQuery(
const ConnectionTimeouts &,
const String & query,
@ -85,10 +171,15 @@ void LocalConnection::sendQuery(
state->query_id = query_id;
state->query = query;
state->stage = QueryProcessingStage::Enum(stage);
state->profile_queue = std::make_shared<InternalProfileEventsQueue>(std::numeric_limits<int>::max());
CurrentThread::attachInternalProfileEventsQueue(state->profile_queue);
if (send_progress)
state->after_send_progress.restart();
if (send_profile_events)
state->after_send_profile_events.restart();
next_packet_type.reset();
try
@ -231,6 +322,13 @@ bool LocalConnection::poll(size_t)
return true;
}
if (send_profile_events && (state->after_send_profile_events.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay))
{
state->after_send_profile_events.restart();
next_packet_type = Protocol::Server::ProfileEvents;
return true;
}
try
{
pollImpl();

View File

@ -5,6 +5,7 @@
#include <QueryPipeline/BlockIO.h>
#include <IO/TimeoutSetter.h>
#include <Interpreters/Session.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Storages/ColumnsDescription.h>
@ -29,6 +30,7 @@ struct LocalQueryState
std::unique_ptr<PullingAsyncPipelineExecutor> executor;
std::unique_ptr<PushingPipelineExecutor> pushing_executor;
std::unique_ptr<PushingAsyncPipelineExecutor> pushing_async_executor;
InternalProfileEventsQueuePtr profile_queue;
std::optional<Exception> exception;
@ -50,13 +52,15 @@ struct LocalQueryState
Progress progress;
/// Time after the last check to stop the request and send the progress.
Stopwatch after_send_progress;
Stopwatch after_send_profile_events;
};
class LocalConnection : public IServerConnection, WithContext
{
public:
explicit LocalConnection(ContextPtr context_, bool send_progress_ = false);
explicit LocalConnection(ContextPtr context_, bool send_progress_ = false, bool send_profile_events_ = false);
~LocalConnection() override;
@ -129,12 +133,15 @@ private:
void updateProgress(const Progress & value);
void updateProfileEvents(Block & block);
bool pollImpl();
ContextMutablePtr query_context;
Session session;
bool send_progress;
bool send_profile_events;
String description = "clickhouse-local";
std::optional<LocalQueryState> state;
@ -144,5 +151,7 @@ private:
std::optional<UInt64> next_packet_type;
String current_database;
ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots;
};
}

View File

@ -1,5 +1,6 @@
#include "ProfileEventsExt.h"
#include <Common/typeid_cast.h>
#include <Common/MemoryTracker.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
@ -45,4 +46,48 @@ void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column,
offsets.push_back(offsets.back() + size);
}
void dumpProfileEvents(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name)
{
size_t rows = 0;
auto & name_column = columns[NAME_COLUMN_INDEX];
auto & value_column = columns[VALUE_COLUMN_INDEX];
for (ProfileEvents::Event event = 0; event < ProfileEvents::Counters::num_counters; ++event)
{
Int64 value = snapshot.counters[event];
if (value == 0)
continue;
const char * desc = ProfileEvents::getName(event);
name_column->insertData(desc, strlen(desc));
value_column->insert(value);
rows++;
}
// Fill the rest of the columns with data
for (size_t row = 0; row < rows; ++row)
{
size_t i = 0;
columns[i++]->insertData(host_name.data(), host_name.size());
columns[i++]->insert(UInt64(snapshot.current_time));
columns[i++]->insert(UInt64{snapshot.thread_id});
columns[i++]->insert(ProfileEvents::Type::INCREMENT);
}
}
void dumpMemoryTracker(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name)
{
{
size_t i = 0;
columns[i++]->insertData(host_name.data(), host_name.size());
columns[i++]->insert(UInt64(snapshot.current_time));
columns[i++]->insert(UInt64{snapshot.thread_id});
columns[i++]->insert(ProfileEvents::Type::GAUGE);
columns[i++]->insertData(MemoryTracker::USAGE_EVENT_NAME, strlen(MemoryTracker::USAGE_EVENT_NAME));
columns[i++]->insert(snapshot.memory_usage);
}
}
}

View File

@ -7,9 +7,27 @@
namespace ProfileEvents
{
constexpr size_t NAME_COLUMN_INDEX = 4;
constexpr size_t VALUE_COLUMN_INDEX = 5;
struct ProfileEventsSnapshot
{
UInt64 thread_id;
ProfileEvents::CountersIncrement counters;
Int64 memory_usage;
time_t current_time;
};
using ThreadIdToCountersSnapshot = std::unordered_map<UInt64, ProfileEvents::Counters::Snapshot>;
/// Dumps profile events to columns Map(String, UInt64)
void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only = true);
/// Add records about provided non-zero ProfileEvents::Counters.
void dumpProfileEvents(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name);
void dumpMemoryTracker(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name);
/// This is for ProfileEvents packets.
enum Type : int8_t
{

View File

@ -853,82 +853,12 @@ void TCPHandler::sendExtremes(const Block & extremes)
}
}
namespace
{
using namespace ProfileEvents;
constexpr size_t NAME_COLUMN_INDEX = 4;
constexpr size_t VALUE_COLUMN_INDEX = 5;
struct ProfileEventsSnapshot
{
UInt64 thread_id;
ProfileEvents::CountersIncrement counters;
Int64 memory_usage;
time_t current_time;
};
/*
* Add records about provided non-zero ProfileEvents::Counters.
*/
void dumpProfileEvents(
ProfileEventsSnapshot const & snapshot,
MutableColumns & columns,
String const & host_name)
{
size_t rows = 0;
auto & name_column = columns[NAME_COLUMN_INDEX];
auto & value_column = columns[VALUE_COLUMN_INDEX];
for (ProfileEvents::Event event = 0; event < ProfileEvents::Counters::num_counters; ++event)
{
Int64 value = snapshot.counters[event];
if (value == 0)
continue;
const char * desc = ProfileEvents::getName(event);
name_column->insertData(desc, strlen(desc));
value_column->insert(value);
rows++;
}
// Fill the rest of the columns with data
for (size_t row = 0; row < rows; ++row)
{
size_t i = 0;
columns[i++]->insertData(host_name.data(), host_name.size());
columns[i++]->insert(UInt64(snapshot.current_time));
columns[i++]->insert(UInt64{snapshot.thread_id});
columns[i++]->insert(ProfileEvents::Type::INCREMENT);
}
}
void dumpMemoryTracker(
ProfileEventsSnapshot const & snapshot,
MutableColumns & columns,
String const & host_name)
{
{
size_t i = 0;
columns[i++]->insertData(host_name.data(), host_name.size());
columns[i++]->insert(UInt64(snapshot.current_time));
columns[i++]->insert(UInt64{snapshot.thread_id});
columns[i++]->insert(ProfileEvents::Type::GAUGE);
columns[i++]->insertData(MemoryTracker::USAGE_EVENT_NAME, strlen(MemoryTracker::USAGE_EVENT_NAME));
columns[i++]->insert(snapshot.memory_usage);
}
}
}
void TCPHandler::sendProfileEvents()
{
if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
return;
NamesAndTypesList column_names_and_types = {
static const NamesAndTypesList column_names_and_types = {
{ "host_name", std::make_shared<DataTypeString>() },
{ "current_time", std::make_shared<DataTypeDateTime>() },
{ "thread_id", std::make_shared<DataTypeUInt64>() },
@ -943,6 +873,7 @@ void TCPHandler::sendProfileEvents()
Block block(std::move(temp_columns));
using namespace ProfileEvents;
MutableColumns columns = block.mutateColumns();
auto thread_group = CurrentThread::getGroup();
auto const current_thread_id = CurrentThread::get().thread_id;

View File

@ -3,9 +3,10 @@
#include <Poco/Net/TCPServerConnection.h>
#include <base/getFQDNOrHostName.h>
#include "Common/ProfileEvents.h"
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadStatus.h>
#include <Core/Protocol.h>
#include <Core/QueryProcessingStage.h>
#include <IO/Progress.h>
@ -13,7 +14,7 @@
#include <QueryPipeline/BlockIO.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/Context_fwd.h>
#include <Formats/NativeReader.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
@ -36,6 +37,8 @@ struct Settings;
class ColumnsDescription;
struct ProfileInfo;
class TCPServer;
class NativeWriter;
class NativeReader;
/// State of query processing.
struct QueryState
@ -189,9 +192,7 @@ private:
CurrentMetrics::Increment metric_increment{CurrentMetrics::TCPConnection};
using ThreadIdToCountersSnapshot = std::unordered_map<UInt64, ProfileEvents::Counters::Snapshot>;
ThreadIdToCountersSnapshot last_sent_snapshots;
ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots;
/// It is the name of the server that will be sent to the client.
String server_display_name;