mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge pull request #30064 from azat/client-print-profile-events
This commit is contained in:
commit
4396daeb1f
@ -128,6 +128,8 @@ You can pass parameters to `clickhouse-client` (all parameters have a default va
|
||||
- `--history_file` — Path to a file containing command history.
|
||||
- `--param_<name>` — Value for a [query with parameters](#cli-queries-with-parameters).
|
||||
- `--hardware-utilization` — Print hardware utilization information in progress bar.
|
||||
- `--print-profile-events` – Print `ProfileEvents` packets.
|
||||
- `--profile-events-delay-ms` – Delay between printing `ProfileEvents` packets (-1 - print only totals, 0 - print every single packet).
|
||||
|
||||
Since version 20.5, `clickhouse-client` has automatic syntax highlighting (always enabled).
|
||||
|
||||
|
@ -267,7 +267,7 @@ void ClientBase::onLogData(Block & block)
|
||||
{
|
||||
initLogsOutputStream();
|
||||
progress_indication.clearProgressOutput();
|
||||
logs_out_stream->write(block);
|
||||
logs_out_stream->writeLogs(block);
|
||||
logs_out_stream->flush();
|
||||
}
|
||||
|
||||
@ -669,39 +669,61 @@ void ClientBase::onEndOfStream()
|
||||
void ClientBase::onProfileEvents(Block & block)
|
||||
{
|
||||
const auto rows = block.rows();
|
||||
if (rows == 0 || !progress_indication.print_hardware_utilization)
|
||||
if (rows == 0)
|
||||
return;
|
||||
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
|
||||
const auto & names = typeid_cast<const ColumnString &>(*block.getByName("name").column);
|
||||
const auto & host_names = typeid_cast<const ColumnString &>(*block.getByName("host_name").column);
|
||||
const auto & array_values = typeid_cast<const ColumnUInt64 &>(*block.getByName("value").column).getData();
|
||||
|
||||
const auto * user_time_name = ProfileEvents::getName(ProfileEvents::UserTimeMicroseconds);
|
||||
const auto * system_time_name = ProfileEvents::getName(ProfileEvents::SystemTimeMicroseconds);
|
||||
|
||||
HostToThreadTimesMap thread_times;
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
if (progress_indication.print_hardware_utilization)
|
||||
{
|
||||
auto thread_id = array_thread_id[i];
|
||||
auto host_name = host_names.getDataAt(i).toString();
|
||||
if (thread_id != 0)
|
||||
progress_indication.addThreadIdToList(host_name, thread_id);
|
||||
auto event_name = names.getDataAt(i);
|
||||
auto value = array_values[i];
|
||||
if (event_name == user_time_name)
|
||||
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
|
||||
const auto & names = typeid_cast<const ColumnString &>(*block.getByName("name").column);
|
||||
const auto & host_names = typeid_cast<const ColumnString &>(*block.getByName("host_name").column);
|
||||
const auto & array_values = typeid_cast<const ColumnUInt64 &>(*block.getByName("value").column).getData();
|
||||
|
||||
const auto * user_time_name = ProfileEvents::getName(ProfileEvents::UserTimeMicroseconds);
|
||||
const auto * system_time_name = ProfileEvents::getName(ProfileEvents::SystemTimeMicroseconds);
|
||||
|
||||
HostToThreadTimesMap thread_times;
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
{
|
||||
thread_times[host_name][thread_id].user_ms = value;
|
||||
auto thread_id = array_thread_id[i];
|
||||
auto host_name = host_names.getDataAt(i).toString();
|
||||
if (thread_id != 0)
|
||||
progress_indication.addThreadIdToList(host_name, thread_id);
|
||||
auto event_name = names.getDataAt(i);
|
||||
auto value = array_values[i];
|
||||
if (event_name == user_time_name)
|
||||
{
|
||||
thread_times[host_name][thread_id].user_ms = value;
|
||||
}
|
||||
else if (event_name == system_time_name)
|
||||
{
|
||||
thread_times[host_name][thread_id].system_ms = value;
|
||||
}
|
||||
else if (event_name == MemoryTracker::USAGE_EVENT_NAME)
|
||||
{
|
||||
thread_times[host_name][thread_id].memory_usage = value;
|
||||
}
|
||||
}
|
||||
else if (event_name == system_time_name)
|
||||
progress_indication.updateThreadEventData(thread_times);
|
||||
}
|
||||
|
||||
if (profile_events.print)
|
||||
{
|
||||
if (profile_events.watch.elapsedMilliseconds() >= profile_events.delay_ms)
|
||||
{
|
||||
thread_times[host_name][thread_id].system_ms = value;
|
||||
initLogsOutputStream();
|
||||
progress_indication.clearProgressOutput();
|
||||
logs_out_stream->writeProfileEvents(block);
|
||||
logs_out_stream->flush();
|
||||
|
||||
profile_events.watch.restart();
|
||||
profile_events.last_block = {};
|
||||
}
|
||||
else if (event_name == MemoryTracker::USAGE_EVENT_NAME)
|
||||
else
|
||||
{
|
||||
thread_times[host_name][thread_id].memory_usage = value;
|
||||
profile_events.last_block = block;
|
||||
}
|
||||
}
|
||||
progress_indication.updateThreadEventData(thread_times);
|
||||
}
|
||||
|
||||
|
||||
@ -1024,6 +1046,7 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
|
||||
processed_rows = 0;
|
||||
written_first_block = false;
|
||||
progress_indication.resetProgress();
|
||||
profile_events.watch.restart();
|
||||
|
||||
{
|
||||
/// Temporarily apply query settings to context.
|
||||
@ -1092,6 +1115,15 @@ void ClientBase::processParsedSingleQuery(const String & full_query, const Strin
|
||||
}
|
||||
}
|
||||
|
||||
/// Always print last block (if it was not printed already)
|
||||
if (profile_events.last_block)
|
||||
{
|
||||
initLogsOutputStream();
|
||||
progress_indication.clearProgressOutput();
|
||||
logs_out_stream->writeProfileEvents(profile_events.last_block);
|
||||
logs_out_stream->flush();
|
||||
}
|
||||
|
||||
if (is_interactive)
|
||||
{
|
||||
std::cout << std::endl << processed_rows << " rows in set. Elapsed: " << progress_indication.elapsedSeconds() << " sec. ";
|
||||
@ -1582,6 +1614,8 @@ void ClientBase::init(int argc, char ** argv)
|
||||
("ignore-error", "do not stop processing in multiquery mode")
|
||||
("stacktrace", "print stack traces of exceptions")
|
||||
("hardware-utilization", "print hardware utilization information in progress bar")
|
||||
("print-profile-events", po::value(&profile_events.print)->zero_tokens(), "Printing ProfileEvents packets")
|
||||
("profile-events-delay-ms", po::value<UInt64>()->default_value(profile_events.delay_ms), "Delay between printing `ProfileEvents` packets (-1 - print only totals, 0 - print every single packet)")
|
||||
;
|
||||
|
||||
addOptions(options_description);
|
||||
@ -1633,6 +1667,10 @@ void ClientBase::init(int argc, char ** argv)
|
||||
config().setBool("vertical", true);
|
||||
if (options.count("stacktrace"))
|
||||
config().setBool("stacktrace", true);
|
||||
if (options.count("print-profile-events"))
|
||||
config().setBool("print-profile-events", true);
|
||||
if (options.count("profile-events-delay-ms"))
|
||||
config().setInt("profile-events-delay-ms", options["profile-events-delay-ms"].as<UInt64>());
|
||||
if (options.count("progress"))
|
||||
config().setBool("progress", true);
|
||||
if (options.count("echo"))
|
||||
@ -1653,6 +1691,8 @@ void ClientBase::init(int argc, char ** argv)
|
||||
progress_indication.print_hardware_utilization = true;
|
||||
|
||||
query_processing_stage = QueryProcessingStage::fromString(options["stage"].as<std::string>());
|
||||
profile_events.print = options.count("print-profile-events");
|
||||
profile_events.delay_ms = options["profile-events-delay-ms"].as<UInt64>();
|
||||
|
||||
processOptions(options_description, options, external_tables_arguments);
|
||||
argsToConfig(common_arguments, config(), 100);
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Common/ProgressIndication.h>
|
||||
#include <Common/InterruptListener.h>
|
||||
#include <Common/ShellCommand.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Core/ExternalTable.h>
|
||||
#include <Poco/Util/Application.h>
|
||||
#include <Interpreters/Context.h>
|
||||
@ -218,6 +219,16 @@ protected:
|
||||
QueryFuzzer fuzzer;
|
||||
int query_fuzzer_runs = 0;
|
||||
|
||||
struct
|
||||
{
|
||||
bool print = false;
|
||||
/// UINT64_MAX -- print only last
|
||||
UInt64 delay_ms = 0;
|
||||
Stopwatch watch;
|
||||
/// For printing only last (delay_ms == 0).
|
||||
Block last_block;
|
||||
} profile_events;
|
||||
|
||||
QueryProcessingStage::Enum query_processing_stage;
|
||||
};
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <Client/InternalTextLogs.h>
|
||||
#include <Core/Block.h>
|
||||
#include <Interpreters/InternalTextLogsQueue.h>
|
||||
#include <Interpreters/ProfileEventsExt.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/HashTable/Hash.h>
|
||||
#include <DataTypes/IDataType.h>
|
||||
@ -13,7 +14,7 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
void InternalTextLogs::write(const Block & block)
|
||||
void InternalTextLogs::writeLogs(const Block & block)
|
||||
{
|
||||
const auto & array_event_time = typeid_cast<const ColumnUInt32 &>(*block.getByName("event_time").column).getData();
|
||||
const auto & array_microseconds = typeid_cast<const ColumnUInt32 &>(*block.getByName("event_time_microseconds").column).getData();
|
||||
@ -97,4 +98,69 @@ void InternalTextLogs::write(const Block & block)
|
||||
}
|
||||
}
|
||||
|
||||
void InternalTextLogs::writeProfileEvents(const Block & block)
|
||||
{
|
||||
const auto & column_host_name = typeid_cast<const ColumnString &>(*block.getByName("host_name").column);
|
||||
const auto & array_current_time = typeid_cast<const ColumnUInt32 &>(*block.getByName("current_time").column).getData();
|
||||
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
|
||||
const auto & array_type = typeid_cast<const ColumnInt8 &>(*block.getByName("type").column).getData();
|
||||
const auto & column_name = typeid_cast<const ColumnString &>(*block.getByName("name").column);
|
||||
const auto & array_value = typeid_cast<const ColumnUInt64 &>(*block.getByName("value").column).getData();
|
||||
|
||||
for (size_t row_num = 0; row_num < block.rows(); ++row_num)
|
||||
{
|
||||
/// host_name
|
||||
auto host_name = column_host_name.getDataAt(row_num);
|
||||
if (host_name.size)
|
||||
{
|
||||
writeCString("[", wb);
|
||||
if (color)
|
||||
writeString(setColor(StringRefHash()(host_name)), wb);
|
||||
writeString(host_name, wb);
|
||||
if (color)
|
||||
writeCString(resetColor(), wb);
|
||||
writeCString("] ", wb);
|
||||
}
|
||||
|
||||
/// current_time
|
||||
auto current_time = array_current_time[row_num];
|
||||
writeDateTimeText<'.', ':'>(current_time, wb);
|
||||
|
||||
/// thread_id
|
||||
UInt64 thread_id = array_thread_id[row_num];
|
||||
writeCString(" [ ", wb);
|
||||
if (color)
|
||||
writeString(setColor(intHash64(thread_id)), wb);
|
||||
writeIntText(thread_id, wb);
|
||||
if (color)
|
||||
writeCString(resetColor(), wb);
|
||||
writeCString(" ] ", wb);
|
||||
|
||||
/// name
|
||||
auto name = column_name.getDataAt(row_num);
|
||||
if (color)
|
||||
writeString(setColor(StringRefHash()(name)), wb);
|
||||
DB::writeString(name, wb);
|
||||
if (color)
|
||||
writeCString(resetColor(), wb);
|
||||
writeCString(": ", wb);
|
||||
|
||||
/// value
|
||||
UInt64 value = array_value[row_num];
|
||||
writeIntText(value, wb);
|
||||
|
||||
//// type
|
||||
Int8 type = array_type[row_num];
|
||||
writeCString(" (", wb);
|
||||
if (color)
|
||||
writeString(setColor(intHash64(type)), wb);
|
||||
writeString(toString(ProfileEvents::TypeEnum->castToName(type)), wb);
|
||||
if (color)
|
||||
writeCString(resetColor(), wb);
|
||||
writeCString(")", wb);
|
||||
|
||||
writeChar('\n', wb);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -6,16 +6,37 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Prints internal server logs
|
||||
/// Input blocks have to have the same structure as SystemLogsQueue::getSampleBlock()
|
||||
/// Prints internal server logs or profile events with colored output (if requested).
|
||||
/// NOTE: IRowOutputFormat does not suite well for this case
|
||||
class InternalTextLogs
|
||||
{
|
||||
public:
|
||||
InternalTextLogs(WriteBuffer & buf_out, bool color_) : wb(buf_out), color(color_) {}
|
||||
|
||||
|
||||
void write(const Block & block);
|
||||
/// Print internal server logs
|
||||
///
|
||||
/// Input blocks have to have the same structure as SystemLogsQueue::getSampleBlock():
|
||||
/// - event_time
|
||||
/// - event_time_microseconds
|
||||
/// - host_name
|
||||
/// - query_id
|
||||
/// - thread_id
|
||||
/// - priority
|
||||
/// - source
|
||||
/// - text
|
||||
void writeLogs(const Block & block);
|
||||
/// Print profile events.
|
||||
///
|
||||
/// Block:
|
||||
/// - host_name
|
||||
/// - current_time
|
||||
/// - thread_id
|
||||
/// - type
|
||||
/// - name
|
||||
/// - value
|
||||
///
|
||||
/// See also TCPHandler::sendProfileEvents() for block columns.
|
||||
void writeProfileEvents(const Block & block);
|
||||
|
||||
void flush()
|
||||
{
|
||||
|
@ -11,6 +11,11 @@
|
||||
namespace ProfileEvents
|
||||
{
|
||||
|
||||
std::shared_ptr<DB::DataTypeEnum8> TypeEnum = std::make_shared<DB::DataTypeEnum8>(DB::DataTypeEnum8::Values{
|
||||
{ "increment", static_cast<Int8>(INCREMENT)},
|
||||
{ "gauge", static_cast<Int8>(GAUGE)},
|
||||
});
|
||||
|
||||
/// Put implementation here to avoid extra linking dependencies for clickhouse_common_io
|
||||
void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only)
|
||||
{
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <DataTypes/DataTypeEnum.h>
|
||||
#include <Columns/IColumn.h>
|
||||
|
||||
|
||||
@ -9,4 +10,13 @@ namespace ProfileEvents
|
||||
/// Dumps profile events to columns Map(String, UInt64)
|
||||
void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only = true);
|
||||
|
||||
/// This is for ProfileEvents packets.
|
||||
enum Type : int8_t
|
||||
{
|
||||
INCREMENT = 1,
|
||||
GAUGE = 2,
|
||||
};
|
||||
|
||||
extern std::shared_ptr<DB::DataTypeEnum8> TypeEnum;
|
||||
|
||||
}
|
||||
|
@ -30,6 +30,7 @@
|
||||
#include <Interpreters/InternalTextLogsQueue.h>
|
||||
#include <Interpreters/OpenTelemetrySpanLog.h>
|
||||
#include <Interpreters/Session.h>
|
||||
#include <Interpreters/ProfileEventsExt.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
|
||||
#include <Storages/StorageS3Cluster.h>
|
||||
@ -831,12 +832,6 @@ namespace
|
||||
{
|
||||
using namespace ProfileEvents;
|
||||
|
||||
enum ProfileEventTypes : int8_t
|
||||
{
|
||||
INCREMENT = 1,
|
||||
GAUGE = 2,
|
||||
};
|
||||
|
||||
constexpr size_t NAME_COLUMN_INDEX = 4;
|
||||
constexpr size_t VALUE_COLUMN_INDEX = 5;
|
||||
|
||||
@ -879,7 +874,7 @@ namespace
|
||||
columns[i++]->insertData(host_name.data(), host_name.size());
|
||||
columns[i++]->insert(UInt64(snapshot.current_time));
|
||||
columns[i++]->insert(UInt64{snapshot.thread_id});
|
||||
columns[i++]->insert(ProfileEventTypes::INCREMENT);
|
||||
columns[i++]->insert(ProfileEvents::Type::INCREMENT);
|
||||
}
|
||||
}
|
||||
|
||||
@ -893,7 +888,7 @@ namespace
|
||||
columns[i++]->insertData(host_name.data(), host_name.size());
|
||||
columns[i++]->insert(UInt64(snapshot.current_time));
|
||||
columns[i++]->insert(UInt64{snapshot.thread_id});
|
||||
columns[i++]->insert(ProfileEventTypes::GAUGE);
|
||||
columns[i++]->insert(ProfileEvents::Type::GAUGE);
|
||||
|
||||
columns[i++]->insertData(MemoryTracker::USAGE_EVENT_NAME, strlen(MemoryTracker::USAGE_EVENT_NAME));
|
||||
columns[i++]->insert(snapshot.memory_usage);
|
||||
@ -907,18 +902,11 @@ void TCPHandler::sendProfileEvents()
|
||||
if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS)
|
||||
return;
|
||||
|
||||
auto profile_event_type = std::make_shared<DataTypeEnum8>(
|
||||
DataTypeEnum8::Values
|
||||
{
|
||||
{ "increment", static_cast<Int8>(INCREMENT)},
|
||||
{ "gauge", static_cast<Int8>(GAUGE)},
|
||||
});
|
||||
|
||||
NamesAndTypesList column_names_and_types = {
|
||||
{ "host_name", std::make_shared<DataTypeString>() },
|
||||
{ "current_time", std::make_shared<DataTypeDateTime>() },
|
||||
{ "thread_id", std::make_shared<DataTypeUInt64>() },
|
||||
{ "type", profile_event_type },
|
||||
{ "type", ProfileEvents::TypeEnum },
|
||||
{ "name", std::make_shared<DataTypeString>() },
|
||||
{ "value", std::make_shared<DataTypeUInt64>() },
|
||||
};
|
||||
|
@ -0,0 +1,4 @@
|
||||
0
|
||||
SelectedRows: 131010 (increment)
|
||||
OK
|
||||
OK
|
15
tests/queries/0_stateless/02050_client_profile_events.sh
Executable file
15
tests/queries/0_stateless/02050_client_profile_events.sh
Executable file
@ -0,0 +1,15 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: long
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
# do not print any ProfileEvents packets
|
||||
$CLICKHOUSE_CLIENT -q 'select * from numbers(1e5) format Null' |& grep -c 'SelectedRows'
|
||||
# print only last
|
||||
$CLICKHOUSE_CLIENT --print-profile-events --profile-events-delay-ms=-1 -q 'select * from numbers(1e5) format Null' |& grep -o 'SelectedRows: .*$'
|
||||
# print everything
|
||||
test "$($CLICKHOUSE_CLIENT --print-profile-events -q 'select * from numbers(1e9) format Null' |& grep -c 'SelectedRows')" -gt 1 && echo OK || echo FAIL
|
||||
# print each 100 ms
|
||||
test "$($CLICKHOUSE_CLIENT --print-profile-events --profile-events-delay-ms=100 -q 'select * from numbers(1e9) format Null' |& grep -c 'SelectedRows')" -gt 1 && echo OK || echo FAIL
|
Loading…
Reference in New Issue
Block a user