Added port column into system.processes table. [#METR-22966]

This commit is contained in:
Vitaliy Lyudvichenko 2016-10-11 21:54:33 +03:00
parent ce0c51f49c
commit 144d9018ba
10 changed files with 29 additions and 13 deletions

View File

@ -80,8 +80,9 @@ private:
using Shared = std::shared_ptr<ContextShared>;
Shared shared;
String user; /// Текущий пользователь.
Poco::Net::IPAddress ip_address; /// IP-адрес, с которого задан запрос.
String user; /// Current user
Poco::Net::IPAddress ip_address; /// IP address
UInt16 port; /// and port, from which current query was recieved
Interface interface = Interface::TCP;
HTTPMethod http_method = HTTPMethod::UNKNOWN; /// NOTE Возможно, перенести это в отдельный struct ClientInfo.
@ -121,9 +122,10 @@ public:
ConfigurationPtr getUsersConfig();
void setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, const String & quota_key);
void setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, UInt16 port, const String & quota_key);
String getUser() const { return user; }
Poco::Net::IPAddress getIPAddress() const { return ip_address; }
UInt16 getPort() const { return port; }
Interface getInterface() const { return interface; }
void setInterface(Interface interface_) { interface = interface_; }

View File

@ -33,6 +33,7 @@ struct ProcessInfo
String user;
String query_id;
Poco::Net::IPAddress ip_address;
UInt16 port;
double elapsed_seconds;
size_t rows;
size_t bytes;
@ -48,6 +49,7 @@ struct ProcessListElement
String user;
String query_id;
Poco::Net::IPAddress ip_address;
UInt16 port;
Stopwatch watch;
@ -67,9 +69,9 @@ struct ProcessListElement
ProcessListElement(const String & query_, const String & user_,
const String & query_id_, const Poco::Net::IPAddress & ip_address_,
size_t max_memory_usage, double memory_tracker_fault_probability,
UInt16 port_, size_t max_memory_usage, double memory_tracker_fault_probability,
QueryPriorities::Handle && priority_handle_)
: query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), memory_tracker(max_memory_usage),
: query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), port(port_), memory_tracker(max_memory_usage),
priority_handle(std::move(priority_handle_))
{
memory_tracker.setDescription("(for query)");
@ -101,6 +103,7 @@ struct ProcessListElement
.user = user,
.query_id = query_id,
.ip_address = ip_address,
.port = port,
.elapsed_seconds = watch.elapsedSeconds(),
.rows = progress.rows,
.bytes = progress.bytes,
@ -184,7 +187,7 @@ public:
* Если времени не хватило - кинуть исключение.
*/
EntryPtr insert(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_,
const Settings & settings);
UInt16 port_, const Settings & settings);
/// Количество одновременно выполняющихся запросов.
size_t size() const { return cur_size; }

View File

@ -283,7 +283,7 @@ ConfigurationPtr Context::getUsersConfig()
}
void Context::setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, const String & quota_key)
void Context::setUser(const String & name, const String & password, const Poco::Net::IPAddress & address, UInt16 port, const String & quota_key)
{
auto lock = getLock();
@ -291,8 +291,9 @@ void Context::setUser(const String & name, const String & password, const Poco::
setSetting("profile", user_props.profile);
setQuota(user_props.quota, quota_key, name, address);
user = name;
ip_address = address;
this->user = name;
this->ip_address = address;
this->port = port;
}

View File

@ -12,7 +12,7 @@ namespace ErrorCodes
ProcessList::EntryPtr ProcessList::insert(
const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_,
const Settings & settings)
UInt16 port_, const Settings & settings)
{
EntryPtr res;
@ -54,7 +54,7 @@ ProcessList::EntryPtr ProcessList::insert(
++cur_size;
res = std::make_shared<Entry>(*this, cont.emplace(cont.end(),
query_, user_, query_id_, ip_address_,
query_, user_, query_id_, ip_address_, port_,
settings.limits.max_memory_usage, settings.memory_tracker_fault_probability,
priorities.insert(settings.priority)));

View File

@ -184,6 +184,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
context.getUser(),
context.getCurrentQueryId(),
context.getIPAddress(),
context.getPort(),
settings);
context.setProcessListElement(&process_list_entry->get());

View File

@ -108,7 +108,7 @@ void HTTPHandler::processQuery(
Context context = *server.global_context;
context.setGlobalContext(*server.global_context);
context.setUser(user, password, request.clientAddress().host(), quota_key);
context.setUser(user, password, request.clientAddress().host(), request.clientAddress().port(), quota_key);
context.setCurrentQueryId(query_id);
std::unique_ptr<ReadBuffer> in_param = std::make_unique<ReadBufferFromString>(query_param);

View File

@ -464,7 +464,7 @@ void TCPHandler::receiveHello()
<< (!user.empty() ? ", user: " + user : "")
<< ".");
connection_context.setUser(user, password, socket().peerAddress().host(), "");
connection_context.setUser(user, password, socket().peerAddress().host(), socket().peerAddress().port(), "");
}

View File

@ -15,6 +15,7 @@ StorageSystemProcesses::StorageSystemProcesses(const std::string & name_)
, columns{
{ "user", std::make_shared<DataTypeString>() },
{ "address", std::make_shared<DataTypeString>() },
{ "port", std::make_shared<DataTypeUInt16>() },
{ "elapsed", std::make_shared<DataTypeFloat64>() },
{ "rows_read", std::make_shared<DataTypeUInt64>() },
{ "bytes_read", std::make_shared<DataTypeUInt64>() },
@ -46,6 +47,7 @@ BlockInputStreams StorageSystemProcesses::read(
ColumnWithTypeAndName col_user{std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "user"};
ColumnWithTypeAndName col_address{std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "address"};
ColumnWithTypeAndName col_port{std::make_shared<ColumnUInt16>(), std::make_shared<DataTypeUInt16>(), "port"};
ColumnWithTypeAndName col_elapsed{std::make_shared<ColumnFloat64>(), std::make_shared<DataTypeFloat64>(), "elapsed"};
ColumnWithTypeAndName col_rows_read{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "rows_read"};
ColumnWithTypeAndName col_bytes_read{std::make_shared<ColumnUInt64>(), std::make_shared<DataTypeUInt64>(), "bytes_read"};
@ -60,6 +62,7 @@ BlockInputStreams StorageSystemProcesses::read(
{
col_user.column->insert(process.user);
col_address.column->insert(process.ip_address.toString());
col_port.column->insert(static_cast<UInt64>(process.port));
col_elapsed.column->insert(process.elapsed_seconds);
col_rows_read.column->insert(process.rows);
col_bytes_read.column->insert(process.bytes);
@ -72,6 +75,7 @@ BlockInputStreams StorageSystemProcesses::read(
Block block{
col_user,
col_address,
col_port,
col_elapsed,
col_rows_read,
col_bytes_read,

View File

@ -0,0 +1,4 @@
#!/bin/bash
set -e
curl --local-port 1390 'http://localhost:8123?query=SELECT%20port%20FROM%20system.processes%20ORDER%20BY%20elapsed%20LIMIT%201'