diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index eb3a03d0564..eb562dfd9eb 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -411,7 +411,8 @@ void LocalServer::setupUsers() void LocalServer::connect() { connection_parameters = ConnectionParameters(config()); - connection = LocalConnection::createConnection(connection_parameters, global_context, need_render_progress); + connection = LocalConnection::createConnection( + connection_parameters, global_context, need_render_progress, need_render_profile_events, server_display_name); } diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index f89e3504c44..a69acb2ed82 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -867,7 +867,7 @@ void ClientBase::onProfileEvents(Block & block) if (rows == 0) return; - if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS) + if (getName() == "local" || server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS) { const auto & array_thread_id = typeid_cast(*block.getByName("thread_id").column).getData(); const auto & names = typeid_cast(*block.getByName("name").column); diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index 406e5aa66b7..e625d4a5c63 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -219,6 +219,7 @@ protected: ProgressIndication progress_indication; bool need_render_progress = true; + bool need_render_profile_events = true; bool written_first_block = false; size_t processed_rows = 0; /// How many rows have been read or written. diff --git a/src/Client/LocalConnection.cpp b/src/Client/LocalConnection.cpp index 021c68271a0..a57086810bf 100644 --- a/src/Client/LocalConnection.cpp +++ b/src/Client/LocalConnection.cpp @@ -6,6 +6,8 @@ #include #include #include +#include +#include namespace DB @@ -18,10 +20,12 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; } -LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_) +LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_, bool send_profile_events_, const String & server_display_name_) : WithContext(context_) , session(getContext(), ClientInfo::Interface::LOCAL) , send_progress(send_progress_) + , send_profile_events(send_profile_events_) + , server_display_name(server_display_name_) { /// Authenticate and create a context to execute queries. session.authenticate("default", "", Poco::Net::SocketAddress{}); @@ -58,6 +62,11 @@ void LocalConnection::updateProgress(const Progress & value) state->progress.incrementPiecewiseAtomically(value); } +void LocalConnection::getProfileEvents(Block & block) +{ + ProfileEvents::getProfileEvents(server_display_name, state->profile_queue, block, last_sent_snapshots); +} + void LocalConnection::sendQuery( const ConnectionTimeouts &, const String & query, @@ -77,18 +86,23 @@ void LocalConnection::sendQuery( if (!current_database.empty()) query_context->setCurrentDatabase(current_database); - CurrentThread::QueryScope query_scope_holder(query_context); state.reset(); state.emplace(); state->query_id = query_id; state->query = query; + state->query_scope_holder = std::make_unique(query_context); state->stage = QueryProcessingStage::Enum(stage); + state->profile_queue = std::make_shared(std::numeric_limits::max()); + CurrentThread::attachInternalProfileEventsQueue(state->profile_queue); if (send_progress) state->after_send_progress.restart(); + if (send_profile_events) + state->after_send_profile_events.restart(); + next_packet_type.reset(); try @@ -231,6 +245,16 @@ bool LocalConnection::poll(size_t) return true; } + if (send_profile_events && (state->after_send_profile_events.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay)) + { + Block block; + state->after_send_profile_events.restart(); + next_packet_type = Protocol::Server::ProfileEvents; + getProfileEvents(block); + state->block.emplace(std::move(block)); + return true; + } + try { pollImpl(); @@ -459,9 +483,14 @@ void LocalConnection::sendMergeTreeReadTaskResponse(const PartitionReadResponse throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); } -ServerConnectionPtr LocalConnection::createConnection(const ConnectionParameters &, ContextPtr current_context, bool send_progress) +ServerConnectionPtr LocalConnection::createConnection( + const ConnectionParameters &, + ContextPtr current_context, + bool send_progress, + bool send_profile_events, + const String & server_display_name) { - return std::make_unique(current_context, send_progress); + return std::make_unique(current_context, send_progress, send_profile_events, server_display_name); } diff --git a/src/Client/LocalConnection.h b/src/Client/LocalConnection.h index b85022cf183..62e95cdfee6 100644 --- a/src/Client/LocalConnection.h +++ b/src/Client/LocalConnection.h @@ -5,6 +5,7 @@ #include #include #include +#include #include @@ -29,6 +30,7 @@ struct LocalQueryState std::unique_ptr executor; std::unique_ptr pushing_executor; std::unique_ptr pushing_async_executor; + InternalProfileEventsQueuePtr profile_queue; std::optional exception; @@ -50,19 +52,28 @@ struct LocalQueryState Progress progress; /// Time after the last check to stop the request and send the progress. Stopwatch after_send_progress; + Stopwatch after_send_profile_events; + + std::unique_ptr query_scope_holder; }; class LocalConnection : public IServerConnection, WithContext { public: - explicit LocalConnection(ContextPtr context_, bool send_progress_ = false); + explicit LocalConnection( + ContextPtr context_, bool send_progress_ = false, bool send_profile_events_ = false, const String & server_display_name_ = ""); ~LocalConnection() override; IServerConnection::Type getConnectionType() const override { return IServerConnection::Type::LOCAL; } - static ServerConnectionPtr createConnection(const ConnectionParameters & connection_parameters, ContextPtr current_context, bool send_progress = false); + static ServerConnectionPtr createConnection( + const ConnectionParameters & connection_parameters, + ContextPtr current_context, + bool send_progress = false, + bool send_profile_events = false, + const String & server_display_name = ""); void setDefaultDatabase(const String & database) override; @@ -129,12 +140,16 @@ private: void updateProgress(const Progress & value); + void getProfileEvents(Block & block); + bool pollImpl(); ContextMutablePtr query_context; Session session; bool send_progress; + bool send_profile_events; + String server_display_name; String description = "clickhouse-local"; std::optional state; @@ -144,5 +159,7 @@ private: std::optional next_packet_type; String current_database; + + ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots; }; } diff --git a/src/Client/Suggest.cpp b/src/Client/Suggest.cpp index 738c98d2119..b711008e233 100644 --- a/src/Client/Suggest.cpp +++ b/src/Client/Suggest.cpp @@ -103,6 +103,7 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p { loading_thread = std::thread([context=Context::createCopy(context), connection_parameters, suggestion_limit, this] { + ThreadStatus thread_status; for (size_t retry = 0; retry < 10; ++retry) { try diff --git a/src/Interpreters/ProfileEventsExt.cpp b/src/Interpreters/ProfileEventsExt.cpp index 472efc109fb..ea87d565854 100644 --- a/src/Interpreters/ProfileEventsExt.cpp +++ b/src/Interpreters/ProfileEventsExt.cpp @@ -1,5 +1,7 @@ #include "ProfileEventsExt.h" #include +#include +#include #include #include #include @@ -36,7 +38,7 @@ void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, if (nonzero_only && 0 == value) continue; - const char * desc = ProfileEvents::getName(event); + const char * desc = getName(event); key_column.insertData(desc, strlen(desc)); value_column.insert(value); size++; @@ -45,4 +47,133 @@ void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, offsets.push_back(offsets.back() + size); } +/// Add records about provided non-zero ProfileEvents::Counters. +static void dumpProfileEvents(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name) +{ + size_t rows = 0; + auto & name_column = columns[NAME_COLUMN_INDEX]; + auto & value_column = columns[VALUE_COLUMN_INDEX]; + for (Event event = 0; event < Counters::num_counters; ++event) + { + Int64 value = snapshot.counters[event]; + + if (value == 0) + continue; + + const char * desc = getName(event); + name_column->insertData(desc, strlen(desc)); + value_column->insert(value); + rows++; + } + + // Fill the rest of the columns with data + for (size_t row = 0; row < rows; ++row) + { + size_t i = 0; + columns[i++]->insertData(host_name.data(), host_name.size()); + columns[i++]->insert(UInt64(snapshot.current_time)); + columns[i++]->insert(UInt64{snapshot.thread_id}); + columns[i++]->insert(Type::INCREMENT); + } +} + +static void dumpMemoryTracker(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name) +{ + size_t i = 0; + columns[i++]->insertData(host_name.data(), host_name.size()); + columns[i++]->insert(UInt64(snapshot.current_time)); + columns[i++]->insert(UInt64{snapshot.thread_id}); + columns[i++]->insert(Type::GAUGE); + + columns[i++]->insertData(MemoryTracker::USAGE_EVENT_NAME, strlen(MemoryTracker::USAGE_EVENT_NAME)); + columns[i++]->insert(snapshot.memory_usage); +} + +void getProfileEvents( + const String & server_display_name, + DB::InternalProfileEventsQueuePtr profile_queue, + DB::Block & block, + ThreadIdToCountersSnapshot & last_sent_snapshots) +{ + using namespace DB; + static const NamesAndTypesList column_names_and_types = { + {"host_name", std::make_shared()}, + {"current_time", std::make_shared()}, + {"thread_id", std::make_shared()}, + {"type", TypeEnum}, + {"name", std::make_shared()}, + {"value", std::make_shared()}, + }; + + ColumnsWithTypeAndName temp_columns; + for (auto const & name_and_type : column_names_and_types) + temp_columns.emplace_back(name_and_type.type, name_and_type.name); + + block = std::move(temp_columns); + MutableColumns columns = block.mutateColumns(); + auto thread_group = CurrentThread::getGroup(); + auto const current_thread_id = CurrentThread::get().thread_id; + std::vector snapshots; + ThreadIdToCountersSnapshot new_snapshots; + ProfileEventsSnapshot group_snapshot; + { + auto stats = thread_group->getProfileEventsCountersAndMemoryForThreads(); + snapshots.reserve(stats.size()); + + for (auto & stat : stats) + { + auto const thread_id = stat.thread_id; + if (thread_id == current_thread_id) + continue; + auto current_time = time(nullptr); + auto previous_snapshot = last_sent_snapshots.find(thread_id); + auto increment = + previous_snapshot != last_sent_snapshots.end() + ? CountersIncrement(stat.counters, previous_snapshot->second) + : CountersIncrement(stat.counters); + snapshots.push_back(ProfileEventsSnapshot{ + thread_id, + std::move(increment), + stat.memory_usage, + current_time + }); + new_snapshots[thread_id] = std::move(stat.counters); + } + + group_snapshot.thread_id = 0; + group_snapshot.current_time = time(nullptr); + group_snapshot.memory_usage = thread_group->memory_tracker.get(); + auto group_counters = thread_group->performance_counters.getPartiallyAtomicSnapshot(); + auto prev_group_snapshot = last_sent_snapshots.find(0); + group_snapshot.counters = + prev_group_snapshot != last_sent_snapshots.end() + ? CountersIncrement(group_counters, prev_group_snapshot->second) + : CountersIncrement(group_counters); + new_snapshots[0] = std::move(group_counters); + } + last_sent_snapshots = std::move(new_snapshots); + + for (auto & snapshot : snapshots) + { + dumpProfileEvents(snapshot, columns, server_display_name); + dumpMemoryTracker(snapshot, columns, server_display_name); + } + dumpProfileEvents(group_snapshot, columns, server_display_name); + dumpMemoryTracker(group_snapshot, columns, server_display_name); + + Block curr_block; + size_t rows = 0; + + for (; profile_queue->tryPop(curr_block); ++rows) + { + auto curr_columns = curr_block.getColumns(); + for (size_t j = 0; j < curr_columns.size(); ++j) + columns[j]->insertRangeFrom(*curr_columns[j], 0, curr_columns[j]->size()); + } + + bool empty = columns[0]->empty(); + if (!empty) + block.setColumns(std::move(columns)); +} + } diff --git a/src/Interpreters/ProfileEventsExt.h b/src/Interpreters/ProfileEventsExt.h index 8a92eadec79..7d9fc512d15 100644 --- a/src/Interpreters/ProfileEventsExt.h +++ b/src/Interpreters/ProfileEventsExt.h @@ -1,5 +1,6 @@ #pragma once #include +#include #include #include @@ -7,9 +8,28 @@ namespace ProfileEvents { +constexpr size_t NAME_COLUMN_INDEX = 4; +constexpr size_t VALUE_COLUMN_INDEX = 5; + +struct ProfileEventsSnapshot +{ + UInt64 thread_id; + CountersIncrement counters; + Int64 memory_usage; + time_t current_time; +}; + +using ThreadIdToCountersSnapshot = std::unordered_map; + /// Dumps profile events to columns Map(String, UInt64) void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only = true); +void getProfileEvents( + const String & server_display_name, + DB::InternalProfileEventsQueuePtr profile_queue, + DB::Block & block, + ThreadIdToCountersSnapshot & last_sent_snapshots); + /// This is for ProfileEvents packets. enum Type : int8_t { diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 99523ff09e3..f4592a8b2c9 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -31,7 +31,6 @@ #include #include #include -#include #include #include #include @@ -853,163 +852,15 @@ void TCPHandler::sendExtremes(const Block & extremes) } } - -namespace -{ - using namespace ProfileEvents; - - constexpr size_t NAME_COLUMN_INDEX = 4; - constexpr size_t VALUE_COLUMN_INDEX = 5; - - struct ProfileEventsSnapshot - { - UInt64 thread_id; - ProfileEvents::CountersIncrement counters; - Int64 memory_usage; - time_t current_time; - }; - - /* - * Add records about provided non-zero ProfileEvents::Counters. - */ - void dumpProfileEvents( - ProfileEventsSnapshot const & snapshot, - MutableColumns & columns, - String const & host_name) - { - size_t rows = 0; - auto & name_column = columns[NAME_COLUMN_INDEX]; - auto & value_column = columns[VALUE_COLUMN_INDEX]; - for (ProfileEvents::Event event = 0; event < ProfileEvents::Counters::num_counters; ++event) - { - Int64 value = snapshot.counters[event]; - - if (value == 0) - continue; - - const char * desc = ProfileEvents::getName(event); - name_column->insertData(desc, strlen(desc)); - value_column->insert(value); - rows++; - } - - // Fill the rest of the columns with data - for (size_t row = 0; row < rows; ++row) - { - size_t i = 0; - columns[i++]->insertData(host_name.data(), host_name.size()); - columns[i++]->insert(UInt64(snapshot.current_time)); - columns[i++]->insert(UInt64{snapshot.thread_id}); - columns[i++]->insert(ProfileEvents::Type::INCREMENT); - } - } - - void dumpMemoryTracker( - ProfileEventsSnapshot const & snapshot, - MutableColumns & columns, - String const & host_name) - { - { - size_t i = 0; - columns[i++]->insertData(host_name.data(), host_name.size()); - columns[i++]->insert(UInt64(snapshot.current_time)); - columns[i++]->insert(UInt64{snapshot.thread_id}); - columns[i++]->insert(ProfileEvents::Type::GAUGE); - - columns[i++]->insertData(MemoryTracker::USAGE_EVENT_NAME, strlen(MemoryTracker::USAGE_EVENT_NAME)); - columns[i++]->insert(snapshot.memory_usage); - } - } -} - - void TCPHandler::sendProfileEvents() { if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS) return; - NamesAndTypesList column_names_and_types = { - { "host_name", std::make_shared() }, - { "current_time", std::make_shared() }, - { "thread_id", std::make_shared() }, - { "type", ProfileEvents::TypeEnum }, - { "name", std::make_shared() }, - { "value", std::make_shared() }, - }; - - ColumnsWithTypeAndName temp_columns; - for (auto const & name_and_type : column_names_and_types) - temp_columns.emplace_back(name_and_type.type, name_and_type.name); - - Block block(std::move(temp_columns)); - - MutableColumns columns = block.mutateColumns(); - auto thread_group = CurrentThread::getGroup(); - auto const current_thread_id = CurrentThread::get().thread_id; - std::vector snapshots; - ThreadIdToCountersSnapshot new_snapshots; - ProfileEventsSnapshot group_snapshot; + Block block; + ProfileEvents::getProfileEvents(server_display_name, state.profile_queue, block, last_sent_snapshots); + if (block.rows() != 0) { - auto stats = thread_group->getProfileEventsCountersAndMemoryForThreads(); - snapshots.reserve(stats.size()); - - for (auto & stat : stats) - { - auto const thread_id = stat.thread_id; - if (thread_id == current_thread_id) - continue; - auto current_time = time(nullptr); - auto previous_snapshot = last_sent_snapshots.find(thread_id); - auto increment = - previous_snapshot != last_sent_snapshots.end() - ? CountersIncrement(stat.counters, previous_snapshot->second) - : CountersIncrement(stat.counters); - snapshots.push_back(ProfileEventsSnapshot{ - thread_id, - std::move(increment), - stat.memory_usage, - current_time - }); - new_snapshots[thread_id] = std::move(stat.counters); - } - - group_snapshot.thread_id = 0; - group_snapshot.current_time = time(nullptr); - group_snapshot.memory_usage = thread_group->memory_tracker.get(); - auto group_counters = thread_group->performance_counters.getPartiallyAtomicSnapshot(); - auto prev_group_snapshot = last_sent_snapshots.find(0); - group_snapshot.counters = - prev_group_snapshot != last_sent_snapshots.end() - ? CountersIncrement(group_counters, prev_group_snapshot->second) - : CountersIncrement(group_counters); - new_snapshots[0] = std::move(group_counters); - } - last_sent_snapshots = std::move(new_snapshots); - - for (auto & snapshot : snapshots) - { - dumpProfileEvents(snapshot, columns, server_display_name); - dumpMemoryTracker(snapshot, columns, server_display_name); - } - dumpProfileEvents(group_snapshot, columns, server_display_name); - dumpMemoryTracker(group_snapshot, columns, server_display_name); - - MutableColumns logs_columns; - Block curr_block; - size_t rows = 0; - - for (; state.profile_queue->tryPop(curr_block); ++rows) - { - auto curr_columns = curr_block.getColumns(); - for (size_t j = 0; j < curr_columns.size(); ++j) - columns[j]->insertRangeFrom(*curr_columns[j], 0, curr_columns[j]->size()); - } - - bool empty = columns[0]->empty(); - if (!empty) - { - block.setColumns(std::move(columns)); - initProfileEventsBlockOutput(block); writeVarUInt(Protocol::Server::ProfileEvents, *out); diff --git a/src/Server/TCPHandler.h b/src/Server/TCPHandler.h index 6afda654e6a..b6ce9fa7507 100644 --- a/src/Server/TCPHandler.h +++ b/src/Server/TCPHandler.h @@ -3,9 +3,10 @@ #include #include -#include "Common/ProfileEvents.h" +#include #include #include +#include #include #include #include @@ -13,7 +14,7 @@ #include #include #include -#include +#include #include @@ -36,6 +37,8 @@ struct Settings; class ColumnsDescription; struct ProfileInfo; class TCPServer; +class NativeWriter; +class NativeReader; /// State of query processing. struct QueryState @@ -189,9 +192,7 @@ private: CurrentMetrics::Increment metric_increment{CurrentMetrics::TCPConnection}; - using ThreadIdToCountersSnapshot = std::unordered_map; - - ThreadIdToCountersSnapshot last_sent_snapshots; + ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots; /// It is the name of the server that will be sent to the client. String server_display_name; diff --git a/utils/check-style/check-style b/utils/check-style/check-style index d178778a410..6ebf53cb932 100755 --- a/utils/check-style/check-style +++ b/utils/check-style/check-style @@ -74,6 +74,8 @@ EXTERN_TYPES_EXCLUDES=( ProfileEvents::Type ProfileEvents::TypeEnum ProfileEvents::dumpToMapColumn + ProfileEvents::getProfileEvents + ProfileEvents::ThreadIdToCountersSnapshot ProfileEvents::LOCAL_NAME ProfileEvents::CountersIncrement