From 144d9018baf8200a45e5576c32ec87c3c08eeb28 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Tue, 11 Oct 2016 21:54:33 +0300 Subject: [PATCH] Added port column into system.processes table. [#METR-22966] --- dbms/include/DB/Interpreters/Context.h | 8 +++++--- dbms/include/DB/Interpreters/ProcessList.h | 9 ++++++--- dbms/src/Interpreters/Context.cpp | 7 ++++--- dbms/src/Interpreters/ProcessList.cpp | 4 ++-- dbms/src/Interpreters/executeQuery.cpp | 1 + dbms/src/Server/HTTPHandler.cpp | 2 +- dbms/src/Server/TCPHandler.cpp | 2 +- dbms/src/Storages/System/StorageSystemProcesses.cpp | 4 ++++ .../0_stateless/00379_system_processes_port.reference | 1 + .../queries/0_stateless/00379_system_processes_port.sh | 4 ++++ 10 files changed, 29 insertions(+), 13 deletions(-) create mode 100644 dbms/tests/queries/0_stateless/00379_system_processes_port.reference create mode 100755 dbms/tests/queries/0_stateless/00379_system_processes_port.sh diff --git a/dbms/include/DB/Interpreters/Context.h b/dbms/include/DB/Interpreters/Context.h index 63548819d8d..3a6c49dd9e4 100644 --- a/dbms/include/DB/Interpreters/Context.h +++ b/dbms/include/DB/Interpreters/Context.h @@ -80,8 +80,9 @@ private: using Shared = std::shared_ptr; Shared shared; - String user; /// Текущий пользователь. - Poco::Net::IPAddress ip_address; /// IP-адрес, с которого задан запрос. + String user; /// Current user + Poco::Net::IPAddress ip_address; /// IP address + UInt16 port; /// and port, from which current query was recieved Interface interface = Interface::TCP; HTTPMethod http_method = HTTPMethod::UNKNOWN; /// NOTE Возможно, перенести это в отдельный struct ClientInfo. @@ -121,9 +122,10 @@ public: ConfigurationPtr getUsersConfig(); - void setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, const String & quota_key); + void setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, UInt16 port, const String & quota_key); String getUser() const { return user; } Poco::Net::IPAddress getIPAddress() const { return ip_address; } + UInt16 getPort() const { return port; } Interface getInterface() const { return interface; } void setInterface(Interface interface_) { interface = interface_; } diff --git a/dbms/include/DB/Interpreters/ProcessList.h b/dbms/include/DB/Interpreters/ProcessList.h index 712b32ab5db..500421344b8 100644 --- a/dbms/include/DB/Interpreters/ProcessList.h +++ b/dbms/include/DB/Interpreters/ProcessList.h @@ -33,6 +33,7 @@ struct ProcessInfo String user; String query_id; Poco::Net::IPAddress ip_address; + UInt16 port; double elapsed_seconds; size_t rows; size_t bytes; @@ -48,6 +49,7 @@ struct ProcessListElement String user; String query_id; Poco::Net::IPAddress ip_address; + UInt16 port; Stopwatch watch; @@ -67,9 +69,9 @@ struct ProcessListElement ProcessListElement(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_, - size_t max_memory_usage, double memory_tracker_fault_probability, + UInt16 port_, size_t max_memory_usage, double memory_tracker_fault_probability, QueryPriorities::Handle && priority_handle_) - : query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), memory_tracker(max_memory_usage), + : query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), port(port_), memory_tracker(max_memory_usage), priority_handle(std::move(priority_handle_)) { memory_tracker.setDescription("(for query)"); @@ -101,6 +103,7 @@ struct ProcessListElement .user = user, .query_id = query_id, .ip_address = ip_address, + .port = port, .elapsed_seconds = watch.elapsedSeconds(), .rows = progress.rows, .bytes = progress.bytes, @@ -184,7 +187,7 @@ public: * Если времени не хватило - кинуть исключение. */ EntryPtr insert(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_, - const Settings & settings); + UInt16 port_, const Settings & settings); /// Количество одновременно выполняющихся запросов. size_t size() const { return cur_size; } diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 97982b648d9..64c34aeb82b 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -283,7 +283,7 @@ ConfigurationPtr Context::getUsersConfig() } -void Context::setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, const String & quota_key) +void Context::setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, UInt16 port, const String & quota_key) { auto lock = getLock(); @@ -291,8 +291,9 @@ void Context::setUser(const String & name, const String & password, const Poco:: setSetting("profile", user_props.profile); setQuota(user_props.quota, quota_key, name, address); - user = name; - ip_address = address; + this->user = name; + this->ip_address = address; + this->port = port; } diff --git a/dbms/src/Interpreters/ProcessList.cpp b/dbms/src/Interpreters/ProcessList.cpp index b0e21b554ca..b1518c27e81 100644 --- a/dbms/src/Interpreters/ProcessList.cpp +++ b/dbms/src/Interpreters/ProcessList.cpp @@ -12,7 +12,7 @@ namespace ErrorCodes ProcessList::EntryPtr ProcessList::insert( const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_, - const Settings & settings) + UInt16 port_, const Settings & settings) { EntryPtr res; @@ -54,7 +54,7 @@ ProcessList::EntryPtr ProcessList::insert( ++cur_size; res = std::make_shared(*this, cont.emplace(cont.end(), - query_, user_, query_id_, ip_address_, + query_, user_, query_id_, ip_address_, port_, settings.limits.max_memory_usage, settings.memory_tracker_fault_probability, priorities.insert(settings.priority))); diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 8f95508e6d5..f2faed4c41e 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -184,6 +184,7 @@ static std::tuple executeQueryImpl( context.getUser(), context.getCurrentQueryId(), context.getIPAddress(), + context.getPort(), settings); context.setProcessListElement(&process_list_entry->get()); diff --git a/dbms/src/Server/HTTPHandler.cpp b/dbms/src/Server/HTTPHandler.cpp index 0362f3fe81b..8635675d97d 100644 --- a/dbms/src/Server/HTTPHandler.cpp +++ b/dbms/src/Server/HTTPHandler.cpp @@ -108,7 +108,7 @@ void HTTPHandler::processQuery( Context context = *server.global_context; context.setGlobalContext(*server.global_context); - context.setUser(user, password, request.clientAddress().host(), quota_key); + context.setUser(user, password, request.clientAddress().host(), request.clientAddress().port(), quota_key); context.setCurrentQueryId(query_id); std::unique_ptr in_param = std::make_unique(query_param); diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index d64e39897a8..3e996bb06b6 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -464,7 +464,7 @@ void TCPHandler::receiveHello() << (!user.empty() ? ", user: " + user : "") << "."); - connection_context.setUser(user, password, socket().peerAddress().host(), ""); + connection_context.setUser(user, password, socket().peerAddress().host(), socket().peerAddress().port(), ""); } diff --git a/dbms/src/Storages/System/StorageSystemProcesses.cpp b/dbms/src/Storages/System/StorageSystemProcesses.cpp index edf05b5bf0d..6f0ebd72f59 100644 --- a/dbms/src/Storages/System/StorageSystemProcesses.cpp +++ b/dbms/src/Storages/System/StorageSystemProcesses.cpp @@ -15,6 +15,7 @@ StorageSystemProcesses::StorageSystemProcesses(const std::string & name_) , columns{ { "user", std::make_shared() }, { "address", std::make_shared() }, + { "port", std::make_shared() }, { "elapsed", std::make_shared() }, { "rows_read", std::make_shared() }, { "bytes_read", std::make_shared() }, @@ -46,6 +47,7 @@ BlockInputStreams StorageSystemProcesses::read( ColumnWithTypeAndName col_user{std::make_shared(), std::make_shared(), "user"}; ColumnWithTypeAndName col_address{std::make_shared(), std::make_shared(), "address"}; + ColumnWithTypeAndName col_port{std::make_shared(), std::make_shared(), "port"}; ColumnWithTypeAndName col_elapsed{std::make_shared(), std::make_shared(), "elapsed"}; ColumnWithTypeAndName col_rows_read{std::make_shared(), std::make_shared(), "rows_read"}; ColumnWithTypeAndName col_bytes_read{std::make_shared(), std::make_shared(), "bytes_read"}; @@ -60,6 +62,7 @@ BlockInputStreams StorageSystemProcesses::read( { col_user.column->insert(process.user); col_address.column->insert(process.ip_address.toString()); + col_port.column->insert(static_cast(process.port)); col_elapsed.column->insert(process.elapsed_seconds); col_rows_read.column->insert(process.rows); col_bytes_read.column->insert(process.bytes); @@ -72,6 +75,7 @@ BlockInputStreams StorageSystemProcesses::read( Block block{ col_user, col_address, + col_port, col_elapsed, col_rows_read, col_bytes_read, diff --git a/dbms/tests/queries/0_stateless/00379_system_processes_port.reference b/dbms/tests/queries/0_stateless/00379_system_processes_port.reference new file mode 100644 index 00000000000..bbeb054e2ef --- /dev/null +++ b/dbms/tests/queries/0_stateless/00379_system_processes_port.reference @@ -0,0 +1 @@ +1390 diff --git a/dbms/tests/queries/0_stateless/00379_system_processes_port.sh b/dbms/tests/queries/0_stateless/00379_system_processes_port.sh new file mode 100755 index 00000000000..ebf7e7c3bf4 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00379_system_processes_port.sh @@ -0,0 +1,4 @@ +#!/bin/bash +set -e + +curl --local-port 1390 'http://localhost:8123?query=SELECT%20port%20FROM%20system.processes%20ORDER%20BY%20elapsed%20LIMIT%201'