Merge pull request #10270 from ClickHouse/quota-key-in-client

Support quota_key for Native client
This commit is contained in:
alexey-milovidov 2020-05-17 14:09:40 +03:00 committed by GitHub
commit 7cf3538840
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 97 additions and 59 deletions

View File

@ -398,6 +398,10 @@ private:
ignore_error = config().getBool("ignore-error", false); ignore_error = config().getBool("ignore-error", false);
} }
ClientInfo & client_info = context.getClientInfo();
client_info.setInitialQuery();
client_info.quota_key = config().getString("quota_key", "");
connect(); connect();
/// Initialize DateLUT here to avoid counting time spent here as query execution time. /// Initialize DateLUT here to avoid counting time spent here as query execution time.
@ -606,9 +610,7 @@ private:
server_version = toString(server_version_major) + "." + toString(server_version_minor) + "." + toString(server_version_patch); server_version = toString(server_version_major) + "." + toString(server_version_minor) + "." + toString(server_version_patch);
if ( if (server_display_name = connection->getServerDisplayName(connection_parameters.timeouts); server_display_name.empty())
server_display_name = connection->getServerDisplayName(connection_parameters.timeouts);
server_display_name.length() == 0)
{ {
server_display_name = config().getString("host", "localhost"); server_display_name = config().getString("host", "localhost");
} }
@ -914,7 +916,7 @@ private:
query_id, query_id,
QueryProcessingStage::Complete, QueryProcessingStage::Complete,
&context.getSettingsRef(), &context.getSettingsRef(),
nullptr, &context.getClientInfo(),
true); true);
sendExternalTables(); sendExternalTables();
@ -946,7 +948,15 @@ private:
if (!parsed_insert_query.data && (is_interactive || (!stdin_is_a_tty && std_in.eof()))) if (!parsed_insert_query.data && (is_interactive || (!stdin_is_a_tty && std_in.eof())))
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT); throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
connection->sendQuery(connection_parameters.timeouts, query_without_data, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true); connection->sendQuery(
connection_parameters.timeouts,
query_without_data,
query_id,
QueryProcessingStage::Complete,
&context.getSettingsRef(),
&context.getClientInfo(),
true);
sendExternalTables(); sendExternalTables();
/// Receive description of table structure. /// Receive description of table structure.
@ -1719,6 +1729,7 @@ public:
*/ */
("password", po::value<std::string>()->implicit_value("\n", ""), "password") ("password", po::value<std::string>()->implicit_value("\n", ""), "password")
("ask-password", "ask-password") ("ask-password", "ask-password")
("quota_key", po::value<std::string>(), "A string to differentiate quotas when the user have keyed quotas configured on server")
("query_id", po::value<std::string>(), "query_id") ("query_id", po::value<std::string>(), "query_id")
("query,q", po::value<std::string>(), "query") ("query,q", po::value<std::string>(), "query")
("database,d", po::value<std::string>(), "database") ("database,d", po::value<std::string>(), "database")
@ -1854,6 +1865,8 @@ public:
config().setString("password", options["password"].as<std::string>()); config().setString("password", options["password"].as<std::string>());
if (options.count("ask-password")) if (options.count("ask-password"))
config().setBool("ask-password", true); config().setBool("ask-password", true);
if (options.count("quota_key"))
config().setString("quota_key", options["quota_key"].as<std::string>());
if (options.count("multiline")) if (options.count("multiline"))
config().setBool("multiline", true); config().setBool("multiline", true);
if (options.count("multiquery")) if (options.count("multiquery"))

View File

@ -29,8 +29,10 @@ ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfigurati
"port", config.getInt(is_secure ? "tcp_port_secure" : "tcp_port", is_secure ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT)); "port", config.getInt(is_secure ? "tcp_port_secure" : "tcp_port", is_secure ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT));
default_database = config.getString("database", ""); default_database = config.getString("database", "");
/// changed the default value to "default" to fix the issue when the user in the prompt is blank /// changed the default value to "default" to fix the issue when the user in the prompt is blank
user = config.getString("user", "default"); user = config.getString("user", "default");
bool password_prompt = false; bool password_prompt = false;
if (config.getBool("ask-password", false)) if (config.getBool("ask-password", false))
{ {
@ -52,6 +54,7 @@ ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfigurati
if (auto result = readpassphrase(prompt.c_str(), buf, sizeof(buf), 0)) if (auto result = readpassphrase(prompt.c_str(), buf, sizeof(buf), 0))
password = result; password = result;
} }
compression = config.getBool("compression", true) ? Protocol::Compression::Enable : Protocol::Compression::Disable; compression = config.getBool("compression", true) ? Protocol::Compression::Enable : Protocol::Compression::Disable;
timeouts = ConnectionTimeouts( timeouts = ConnectionTimeouts(

View File

@ -23,7 +23,6 @@ struct ConnectionParameters
ConnectionTimeouts timeouts; ConnectionTimeouts timeouts;
ConnectionParameters() {} ConnectionParameters() {}
ConnectionParameters(const Poco::Util::AbstractConfiguration & config); ConnectionParameters(const Poco::Util::AbstractConfiguration & config);
}; };

View File

@ -279,7 +279,7 @@ void LocalServer::processQueries()
context->makeSessionContext(); context->makeSessionContext();
context->makeQueryContext(); context->makeQueryContext();
context->setUser("default", "", Poco::Net::SocketAddress{}, ""); context->setUser("default", "", Poco::Net::SocketAddress{});
context->setCurrentQueryId(""); context->setCurrentQueryId("");
applyCmdSettings(); applyCmdSettings();

View File

@ -283,8 +283,10 @@ void HTTPHandler::processQuery(
} }
std::string query_id = params.get("query_id", ""); std::string query_id = params.get("query_id", "");
context.setUser(user, password, request.clientAddress(), quota_key); context.setUser(user, password, request.clientAddress());
context.setCurrentQueryId(query_id); context.setCurrentQueryId(query_id);
if (!quota_key.empty())
context.setQuotaKey(quota_key);
/// The user could specify session identifier and session timeout. /// The user could specify session identifier and session timeout.
/// It allows to modify settings, create temporary tables and reuse them in subsequent requests. /// It allows to modify settings, create temporary tables and reuse them in subsequent requests.

View File

@ -735,7 +735,7 @@ void TCPHandler::receiveHello()
<< (!user.empty() ? ", user: " + user : "") << (!user.empty() ? ", user: " + user : "")
<< "."); << ".");
connection_context.setUser(user, password, socket().peerAddress(), ""); connection_context.setUser(user, password, socket().peerAddress());
} }

View File

@ -61,8 +61,11 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
if (connected) if (connected)
disconnect(); disconnect();
LOG_TRACE(log_wrapper.get(), "Connecting. Database: " << (default_database.empty() ? "(not specified)" : default_database) << ". User: " << user LOG_TRACE(log_wrapper.get(), "Connecting. Database: "
<< (static_cast<bool>(secure) ? ". Secure" : "") << (static_cast<bool>(compression) ? "" : ". Uncompressed")); << (default_database.empty() ? "(not specified)" : default_database)
<< ". User: " << user
<< (static_cast<bool>(secure) ? ". Secure" : "")
<< (static_cast<bool>(compression) ? "" : ". Uncompressed"));
if (static_cast<bool>(secure)) if (static_cast<bool>(secure))
{ {
@ -165,12 +168,14 @@ void Connection::sendHello()
|| has_control_character(password)) || has_control_character(password))
throw Exception("Parameters 'default_database', 'user' and 'password' must not contain ASCII control characters", ErrorCodes::BAD_ARGUMENTS); throw Exception("Parameters 'default_database', 'user' and 'password' must not contain ASCII control characters", ErrorCodes::BAD_ARGUMENTS);
auto client_revision = ClickHouseRevision::get();
writeVarUInt(Protocol::Client::Hello, *out); writeVarUInt(Protocol::Client::Hello, *out);
writeStringBinary((DBMS_NAME " ") + client_name, *out); writeStringBinary((DBMS_NAME " ") + client_name, *out);
writeVarUInt(DBMS_VERSION_MAJOR, *out); writeVarUInt(DBMS_VERSION_MAJOR, *out);
writeVarUInt(DBMS_VERSION_MINOR, *out); writeVarUInt(DBMS_VERSION_MINOR, *out);
// NOTE For backward compatibility of the protocol, client cannot send its version_patch. // NOTE For backward compatibility of the protocol, client cannot send its version_patch.
writeVarUInt(ClickHouseRevision::get(), *out); writeVarUInt(client_revision, *out);
writeStringBinary(default_database, *out); writeStringBinary(default_database, *out);
writeStringBinary(user, *out); writeStringBinary(user, *out);
writeStringBinary(password, *out); writeStringBinary(password, *out);
@ -394,23 +399,10 @@ void Connection::sendQuery(
/// Client info. /// Client info.
if (server_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO) if (server_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO)
{ {
ClientInfo client_info_to_send; if (client_info && !client_info->empty())
client_info->write(*out, server_revision);
if (!client_info || client_info->empty())
{
/// No client info passed - means this query initiated by me.
client_info_to_send.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
client_info_to_send.fillOSUserHostNameAndVersionInfo();
client_info_to_send.client_name = (DBMS_NAME " ") + client_name;
}
else else
{ ClientInfo().write(*out, server_revision);
/// This query is initiated by another query.
client_info_to_send = *client_info;
client_info_to_send.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
}
client_info_to_send.write(*out, server_revision);
} }
/// Per query settings. /// Per query settings.

View File

@ -16,7 +16,7 @@ namespace DB
* void thread() * void thread()
* { * {
* auto connection = pool.get(); * auto connection = pool.get();
* connection->sendQuery("SELECT 'Hello, world!' AS world"); * connection->sendQuery(...);
* } * }
*/ */

View File

@ -94,7 +94,7 @@ void MultiplexedConnections::sendQuery(
const String & query, const String & query,
const String & query_id, const String & query_id,
UInt64 stage, UInt64 stage,
const ClientInfo * client_info, const ClientInfo & client_info,
bool with_pending_data) bool with_pending_data)
{ {
std::lock_guard lock(cancel_mutex); std::lock_guard lock(cancel_mutex);
@ -126,14 +126,14 @@ void MultiplexedConnections::sendQuery(
{ {
modified_settings.parallel_replica_offset = i; modified_settings.parallel_replica_offset = i;
replica_states[i].connection->sendQuery(timeouts, query, query_id, replica_states[i].connection->sendQuery(timeouts, query, query_id,
stage, &modified_settings, client_info, with_pending_data); stage, &modified_settings, &client_info, with_pending_data);
} }
} }
else else
{ {
/// Use single replica. /// Use single replica.
replica_states[0].connection->sendQuery(timeouts, query, query_id, stage, replica_states[0].connection->sendQuery(timeouts, query, query_id,
&modified_settings, client_info, with_pending_data); stage, &modified_settings, &client_info, with_pending_data);
} }
sent_query = true; sent_query = true;

View File

@ -36,10 +36,10 @@ public:
void sendQuery( void sendQuery(
const ConnectionTimeouts & timeouts, const ConnectionTimeouts & timeouts,
const String & query, const String & query,
const String & query_id = "", const String & query_id,
UInt64 stage = QueryProcessingStage::Complete, UInt64 stage,
const ClientInfo * client_info = nullptr, const ClientInfo & client_info,
bool with_pending_data = false); bool with_pending_data);
/// Get packet from any replica. /// Get packet from any replica.
Packet receivePacket(); Packet receivePacket();

View File

@ -959,7 +959,7 @@ public:
if (auth_response->empty()) if (auth_response->empty())
{ {
context.setUser(user_name, "", address, ""); context.setUser(user_name, "", address);
return; return;
} }
@ -982,7 +982,7 @@ public:
{ {
password_sha1[i] = digest[i] ^ static_cast<unsigned char>((*auth_response)[i]); password_sha1[i] = digest[i] ^ static_cast<unsigned char>((*auth_response)[i]);
} }
context.setUser(user_name, password_sha1, address, ""); context.setUser(user_name, password_sha1, address);
} }
private: private:
String scramble; String scramble;
@ -1124,7 +1124,7 @@ public:
password.pop_back(); password.pop_back();
} }
context.setUser(user_name, password, address, ""); context.setUser(user_name, password, address);
} }
private: private:

View File

@ -347,7 +347,10 @@ void RemoteBlockInputStream::sendQuery()
established = true; established = true;
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
multiplexed_connections->sendQuery(timeouts, query, query_id, stage, &context.getClientInfo(), true); ClientInfo modified_client_info = context.getClientInfo();
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
multiplexed_connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true);
established = false; established = false;
sent_query = true; sent_query = true;

View File

@ -21,14 +21,17 @@ namespace ErrorCodes
RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_,
const ConnectionTimeouts & timeouts, const ConnectionTimeouts & timeouts,
const String & query_, const String & query_,
const Settings * settings_, const Settings & settings_,
const ClientInfo * client_info_) const ClientInfo & client_info_)
: connection(connection_), query(query_), settings(settings_), client_info(client_info_) : connection(connection_), query(query_)
{ {
/** Send query and receive "header", that describe table structure. ClientInfo modified_client_info = client_info_;
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
/** Send query and receive "header", that describes table structure.
* Header is needed to know, what structure is required for blocks to be passed to 'write' method. * Header is needed to know, what structure is required for blocks to be passed to 'write' method.
*/ */
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, settings, client_info); connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &settings_, &modified_client_info);
while (true) while (true)
{ {

View File

@ -22,8 +22,8 @@ public:
RemoteBlockOutputStream(Connection & connection_, RemoteBlockOutputStream(Connection & connection_,
const ConnectionTimeouts & timeouts, const ConnectionTimeouts & timeouts,
const String & query_, const String & query_,
const Settings * settings_ = nullptr, const Settings & settings_,
const ClientInfo * client_info_ = nullptr); const ClientInfo & client_info_);
Block getHeader() const override { return header; } Block getHeader() const override { return header; }
@ -38,8 +38,6 @@ public:
private: private:
Connection & connection; Connection & connection;
String query; String query;
const Settings * settings;
const ClientInfo * client_info;
Block header; Block header;
bool finished = false; bool finished = false;
}; };

View File

@ -74,7 +74,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
, load_all_query{query_builder.composeLoadAllQuery()} , load_all_query{query_builder.composeLoadAllQuery()}
{ {
/// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication). /// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication).
context.setUser(user, password, Poco::Net::SocketAddress("127.0.0.1", 0), {}); context.setUser(user, password, Poco::Net::SocketAddress("127.0.0.1", 0));
context = copyContextAndApplySettings(path_to_settings, context, config); context = copyContextAndApplySettings(path_to_settings, context, config);
/// Query context is needed because some code in executeQuery function may assume it exists. /// Query context is needed because some code in executeQuery function may assume it exists.

View File

@ -116,6 +116,14 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
} }
void ClientInfo::setInitialQuery()
{
query_kind = QueryKind::INITIAL_QUERY;
fillOSUserHostNameAndVersionInfo();
client_name = (DBMS_NAME " ") + client_name;
}
void ClientInfo::fillOSUserHostNameAndVersionInfo() void ClientInfo::fillOSUserHostNameAndVersionInfo()
{ {
os_user.resize(256, '\0'); os_user.resize(256, '\0');

View File

@ -84,6 +84,10 @@ public:
void write(WriteBuffer & out, const UInt64 server_protocol_revision) const; void write(WriteBuffer & out, const UInt64 server_protocol_revision) const;
void read(ReadBuffer & in, const UInt64 client_protocol_revision); void read(ReadBuffer & in, const UInt64 client_protocol_revision);
/// Initialize parameters on client initiating query.
void setInitialQuery();
private:
void fillOSUserHostNameAndVersionInfo(); void fillOSUserHostNameAndVersionInfo();
}; };

View File

@ -648,15 +648,13 @@ ConfigurationPtr Context::getUsersConfig()
} }
void Context::setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address, const String & quota_key) void Context::setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address)
{ {
auto lock = getLock(); auto lock = getLock();
client_info.current_user = name; client_info.current_user = name;
client_info.current_password = password; client_info.current_password = password;
client_info.current_address = address; client_info.current_address = address;
if (!quota_key.empty())
client_info.quota_key = quota_key;
auto new_user_id = getAccessControlManager().find<User>(name); auto new_user_id = getAccessControlManager().find<User>(name);
std::shared_ptr<const ContextAccess> new_access; std::shared_ptr<const ContextAccess> new_access;
@ -686,6 +684,12 @@ std::shared_ptr<const User> Context::getUser() const
return getAccess()->getUser(); return getAccess()->getUser();
} }
void Context::setQuotaKey(String quota_key_)
{
auto lock = getLock();
client_info.quota_key = std::move(quota_key_);
}
String Context::getUserName() const String Context::getUserName() const
{ {
return getAccess()->getUserName(); return getAccess()->getUserName();

View File

@ -250,7 +250,8 @@ public:
/// Sets the current user, checks the password and that the specified host is allowed. /// Sets the current user, checks the password and that the specified host is allowed.
/// Must be called before getClientInfo. /// Must be called before getClientInfo.
void setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address, const String & quota_key); void setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address);
void setQuotaKey(String quota_key_);
UserPtr getUser() const; UserPtr getUser() const;
String getUserName() const; String getUserName() const;

View File

@ -276,7 +276,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
ClientInfo client_info; ClientInfo client_info;
readHeader(in, insert_settings, insert_query, client_info, log); readHeader(in, insert_settings, insert_query, client_info, log);
RemoteBlockOutputStream remote{*connection, timeouts, insert_query, &insert_settings, &client_info}; RemoteBlockOutputStream remote{*connection, timeouts, insert_query, insert_settings, client_info};
remote.writePrefix(); remote.writePrefix();
remote.writePrepared(in); remote.writePrepared(in);
@ -465,7 +465,7 @@ struct StorageDistributedDirectoryMonitor::Batch
if (first) if (first)
{ {
first = false; first = false;
remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts, insert_query, &insert_settings, &client_info); remote = std::make_unique<RemoteBlockOutputStream>(*connection, timeouts, insert_query, insert_settings, client_info);
remote->writePrefix(); remote->writePrefix();
} }

View File

@ -312,7 +312,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
if (throttler) if (throttler)
job.connection_entry->setThrottler(throttler); job.connection_entry->setThrottler(throttler);
job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, timeouts, query_string, &settings, &context.getClientInfo()); job.stream = std::make_shared<RemoteBlockOutputStream>(*job.connection_entry, timeouts, query_string, settings, context.getClientInfo());
job.stream->writePrefix(); job.stream->writePrefix();
} }

View File

@ -0,0 +1,2 @@
1
Hello

View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --quota_key Hello --query_id test_quota_key --log_queries 1 --multiquery --query "SELECT 1; SYSTEM FLUSH LOGS; SELECT DISTINCT quota_key FROM system.query_log WHERE event_date >= yesterday() AND event_time >= now() - 300 AND query_id = 'test_quota_key'"