mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Implemented in a different way
This commit is contained in:
parent
c22f91db01
commit
fe59524481
@ -397,6 +397,10 @@ private:
|
|||||||
ignore_error = config().getBool("ignore-error", false);
|
ignore_error = config().getBool("ignore-error", false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ClientInfo & client_info = context.getClientInfo();
|
||||||
|
client_info.setInitialQuery();
|
||||||
|
client_info.quota_key = config().getString("quota_key", "");
|
||||||
|
|
||||||
connect();
|
connect();
|
||||||
|
|
||||||
/// Initialize DateLUT here to avoid counting time spent here as query execution time.
|
/// Initialize DateLUT here to avoid counting time spent here as query execution time.
|
||||||
@ -589,9 +593,6 @@ private:
|
|||||||
connection_parameters.compression,
|
connection_parameters.compression,
|
||||||
connection_parameters.security);
|
connection_parameters.security);
|
||||||
|
|
||||||
if (!connection_parameters.quota_key.empty())
|
|
||||||
connection->setQuotaKey(connection_parameters.quota_key);
|
|
||||||
|
|
||||||
String server_name;
|
String server_name;
|
||||||
UInt64 server_version_major = 0;
|
UInt64 server_version_major = 0;
|
||||||
UInt64 server_version_minor = 0;
|
UInt64 server_version_minor = 0;
|
||||||
@ -906,7 +907,7 @@ private:
|
|||||||
query_id,
|
query_id,
|
||||||
QueryProcessingStage::Complete,
|
QueryProcessingStage::Complete,
|
||||||
&context.getSettingsRef(),
|
&context.getSettingsRef(),
|
||||||
nullptr,
|
&context.getClientInfo(),
|
||||||
true);
|
true);
|
||||||
|
|
||||||
sendExternalTables();
|
sendExternalTables();
|
||||||
@ -938,7 +939,15 @@ private:
|
|||||||
if (!parsed_insert_query.data && (is_interactive || (!stdin_is_a_tty && std_in.eof())))
|
if (!parsed_insert_query.data && (is_interactive || (!stdin_is_a_tty && std_in.eof())))
|
||||||
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
|
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
|
||||||
|
|
||||||
connection->sendQuery(connection_parameters.timeouts, query_without_data, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true);
|
connection->sendQuery(
|
||||||
|
connection_parameters.timeouts,
|
||||||
|
query_without_data,
|
||||||
|
query_id,
|
||||||
|
QueryProcessingStage::Complete,
|
||||||
|
&context.getSettingsRef(),
|
||||||
|
&context.getClientInfo(),
|
||||||
|
true);
|
||||||
|
|
||||||
sendExternalTables();
|
sendExternalTables();
|
||||||
|
|
||||||
/// Receive description of table structure.
|
/// Receive description of table structure.
|
||||||
|
@ -55,8 +55,6 @@ ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfigurati
|
|||||||
password = result;
|
password = result;
|
||||||
}
|
}
|
||||||
|
|
||||||
quota_key = config.getString("quota_key", "");
|
|
||||||
|
|
||||||
compression = config.getBool("compression", true) ? Protocol::Compression::Enable : Protocol::Compression::Disable;
|
compression = config.getBool("compression", true) ? Protocol::Compression::Enable : Protocol::Compression::Disable;
|
||||||
|
|
||||||
timeouts = ConnectionTimeouts(
|
timeouts = ConnectionTimeouts(
|
||||||
|
@ -18,7 +18,6 @@ struct ConnectionParameters
|
|||||||
std::string default_database;
|
std::string default_database;
|
||||||
std::string user;
|
std::string user;
|
||||||
std::string password;
|
std::string password;
|
||||||
std::string quota_key;
|
|
||||||
Protocol::Secure security = Protocol::Secure::Disable;
|
Protocol::Secure security = Protocol::Secure::Disable;
|
||||||
Protocol::Compression compression = Protocol::Compression::Enable;
|
Protocol::Compression compression = Protocol::Compression::Enable;
|
||||||
ConnectionTimeouts timeouts;
|
ConnectionTimeouts timeouts;
|
||||||
|
@ -276,7 +276,7 @@ void LocalServer::processQueries()
|
|||||||
context->makeSessionContext();
|
context->makeSessionContext();
|
||||||
context->makeQueryContext();
|
context->makeQueryContext();
|
||||||
|
|
||||||
context->setUser("default", "", Poco::Net::SocketAddress{}, "");
|
context->setUser("default", "", Poco::Net::SocketAddress{});
|
||||||
context->setCurrentQueryId("");
|
context->setCurrentQueryId("");
|
||||||
applyCmdSettings();
|
applyCmdSettings();
|
||||||
|
|
||||||
|
@ -267,8 +267,10 @@ void HTTPHandler::processQuery(
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::string query_id = params.get("query_id", "");
|
std::string query_id = params.get("query_id", "");
|
||||||
context.setUser(user, password, request.clientAddress(), quota_key);
|
context.setUser(user, password, request.clientAddress());
|
||||||
context.setCurrentQueryId(query_id);
|
context.setCurrentQueryId(query_id);
|
||||||
|
if (!quota_key.empty())
|
||||||
|
context.setQuotaKey(quota_key);
|
||||||
|
|
||||||
/// The user could specify session identifier and session timeout.
|
/// The user could specify session identifier and session timeout.
|
||||||
/// It allows to modify settings, create temporary tables and reuse them in subsequent requests.
|
/// It allows to modify settings, create temporary tables and reuse them in subsequent requests.
|
||||||
|
@ -743,7 +743,6 @@ void TCPHandler::receiveHello()
|
|||||||
UInt64 packet_type = 0;
|
UInt64 packet_type = 0;
|
||||||
String user = "default";
|
String user = "default";
|
||||||
String password;
|
String password;
|
||||||
String quota_key;
|
|
||||||
|
|
||||||
readVarUInt(packet_type, *in);
|
readVarUInt(packet_type, *in);
|
||||||
if (packet_type != Protocol::Client::Hello)
|
if (packet_type != Protocol::Client::Hello)
|
||||||
@ -773,9 +772,6 @@ void TCPHandler::receiveHello()
|
|||||||
readStringBinary(user, *in);
|
readStringBinary(user, *in);
|
||||||
readStringBinary(password, *in);
|
readStringBinary(password, *in);
|
||||||
|
|
||||||
if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_PROVIDED_QUOTA_KEY)
|
|
||||||
readStringBinary(quota_key, *in);
|
|
||||||
|
|
||||||
LOG_DEBUG(log, "Connected " << client_name
|
LOG_DEBUG(log, "Connected " << client_name
|
||||||
<< " version " << client_version_major
|
<< " version " << client_version_major
|
||||||
<< "." << client_version_minor
|
<< "." << client_version_minor
|
||||||
@ -783,28 +779,24 @@ void TCPHandler::receiveHello()
|
|||||||
<< ", revision: " << client_revision
|
<< ", revision: " << client_revision
|
||||||
<< (!default_database.empty() ? ", database: " + default_database : "")
|
<< (!default_database.empty() ? ", database: " + default_database : "")
|
||||||
<< (!user.empty() ? ", user: " + user : "")
|
<< (!user.empty() ? ", user: " + user : "")
|
||||||
<< (!quota_key.empty() ? ", quota_key: " + quota_key : "") /// quota key is not secret info.
|
|
||||||
<< ".");
|
<< ".");
|
||||||
|
|
||||||
connection_context.setUser(user, password, socket().peerAddress(), quota_key);
|
connection_context.setUser(user, password, socket().peerAddress());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void TCPHandler::receiveUnexpectedHello()
|
void TCPHandler::receiveUnexpectedHello()
|
||||||
{
|
{
|
||||||
UInt64 skip_uint_64;
|
UInt64 skip_uint_64;
|
||||||
UInt64 client_revision;
|
|
||||||
String skip_string;
|
String skip_string;
|
||||||
|
|
||||||
readStringBinary(skip_string, *in);
|
readStringBinary(skip_string, *in);
|
||||||
readVarUInt(skip_uint_64, *in);
|
readVarUInt(skip_uint_64, *in);
|
||||||
readVarUInt(skip_uint_64, *in);
|
readVarUInt(skip_uint_64, *in);
|
||||||
readVarUInt(client_revision, *in);
|
readVarUInt(skip_uint_64, *in);
|
||||||
readStringBinary(skip_string, *in);
|
readStringBinary(skip_string, *in);
|
||||||
readStringBinary(skip_string, *in);
|
readStringBinary(skip_string, *in);
|
||||||
readStringBinary(skip_string, *in);
|
readStringBinary(skip_string, *in);
|
||||||
if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_PROVIDED_QUOTA_KEY)
|
|
||||||
readStringBinary(skip_string, *in);
|
|
||||||
|
|
||||||
throw NetException("Unexpected packet Hello received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
throw NetException("Unexpected packet Hello received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
|
||||||
}
|
}
|
||||||
|
@ -177,9 +177,6 @@ void Connection::sendHello()
|
|||||||
writeStringBinary(user, *out);
|
writeStringBinary(user, *out);
|
||||||
writeStringBinary(password, *out);
|
writeStringBinary(password, *out);
|
||||||
|
|
||||||
if (client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_PROVIDED_QUOTA_KEY)
|
|
||||||
writeStringBinary(quota_key, *out);
|
|
||||||
|
|
||||||
out->next();
|
out->next();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -404,9 +401,7 @@ void Connection::sendQuery(
|
|||||||
if (!client_info || client_info->empty())
|
if (!client_info || client_info->empty())
|
||||||
{
|
{
|
||||||
/// No client info passed - means this query initiated by me.
|
/// No client info passed - means this query initiated by me.
|
||||||
client_info_to_send.query_kind = ClientInfo::QueryKind::INITIAL_QUERY;
|
client_info_to_send.setInitialQuery();
|
||||||
client_info_to_send.fillOSUserHostNameAndVersionInfo();
|
|
||||||
client_info_to_send.client_name = (DBMS_NAME " ") + client_name;
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -104,11 +104,6 @@ public:
|
|||||||
|
|
||||||
virtual ~Connection() {}
|
virtual ~Connection() {}
|
||||||
|
|
||||||
void setQuotaKey(String quota_key_)
|
|
||||||
{
|
|
||||||
quota_key = std::move(quota_key_);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set throttler of network traffic. One throttler could be used for multiple connections to limit total traffic.
|
/// Set throttler of network traffic. One throttler could be used for multiple connections to limit total traffic.
|
||||||
void setThrottler(const ThrottlerPtr & throttler_)
|
void setThrottler(const ThrottlerPtr & throttler_)
|
||||||
{
|
{
|
||||||
@ -191,7 +186,6 @@ private:
|
|||||||
String default_database;
|
String default_database;
|
||||||
String user;
|
String user;
|
||||||
String password;
|
String password;
|
||||||
String quota_key;
|
|
||||||
|
|
||||||
/// Address is resolved during the first connection (or the following reconnects)
|
/// Address is resolved during the first connection (or the following reconnects)
|
||||||
/// Use it only for logging purposes
|
/// Use it only for logging purposes
|
||||||
|
@ -67,9 +67,6 @@
|
|||||||
/// Mininum revision supporting SettingsBinaryFormat::STRINGS.
|
/// Mininum revision supporting SettingsBinaryFormat::STRINGS.
|
||||||
#define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429
|
#define DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS 54429
|
||||||
|
|
||||||
/// Minimum revision when client sends quota key.
|
|
||||||
#define DBMS_MIN_REVISION_WITH_CLIENT_PROVIDED_QUOTA_KEY 54435
|
|
||||||
|
|
||||||
/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
|
/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
|
||||||
#define DBMS_TCP_PROTOCOL_VERSION 54226
|
#define DBMS_TCP_PROTOCOL_VERSION 54226
|
||||||
|
|
||||||
|
@ -955,7 +955,7 @@ public:
|
|||||||
|
|
||||||
if (auth_response->empty())
|
if (auth_response->empty())
|
||||||
{
|
{
|
||||||
context.setUser(user_name, "", address, "");
|
context.setUser(user_name, "", address);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -978,7 +978,7 @@ public:
|
|||||||
{
|
{
|
||||||
password_sha1[i] = digest[i] ^ static_cast<unsigned char>((*auth_response)[i]);
|
password_sha1[i] = digest[i] ^ static_cast<unsigned char>((*auth_response)[i]);
|
||||||
}
|
}
|
||||||
context.setUser(user_name, password_sha1, address, "");
|
context.setUser(user_name, password_sha1, address);
|
||||||
}
|
}
|
||||||
private:
|
private:
|
||||||
String scramble;
|
String scramble;
|
||||||
@ -1120,7 +1120,7 @@ public:
|
|||||||
password.pop_back();
|
password.pop_back();
|
||||||
}
|
}
|
||||||
|
|
||||||
context.setUser(user_name, password, address, "");
|
context.setUser(user_name, password, address);
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -73,7 +73,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
|
|||||||
, load_all_query{query_builder.composeLoadAllQuery()}
|
, load_all_query{query_builder.composeLoadAllQuery()}
|
||||||
{
|
{
|
||||||
/// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication).
|
/// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication).
|
||||||
context.setUser(user, password, Poco::Net::SocketAddress("127.0.0.1", 0), {});
|
context.setUser(user, password, Poco::Net::SocketAddress("127.0.0.1", 0));
|
||||||
/// Processors are not supported here yet.
|
/// Processors are not supported here yet.
|
||||||
context.setSetting("experimental_use_processors", false);
|
context.setSetting("experimental_use_processors", false);
|
||||||
/// Query context is needed because some code in executeQuery function may assume it exists.
|
/// Query context is needed because some code in executeQuery function may assume it exists.
|
||||||
|
@ -113,6 +113,14 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void ClientInfo::setInitialQuery()
|
||||||
|
{
|
||||||
|
query_kind = QueryKind::INITIAL_QUERY;
|
||||||
|
fillOSUserHostNameAndVersionInfo();
|
||||||
|
client_name = (DBMS_NAME " ") + client_name;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void ClientInfo::fillOSUserHostNameAndVersionInfo()
|
void ClientInfo::fillOSUserHostNameAndVersionInfo()
|
||||||
{
|
{
|
||||||
os_user.resize(256, '\0');
|
os_user.resize(256, '\0');
|
||||||
|
@ -84,6 +84,10 @@ public:
|
|||||||
void write(WriteBuffer & out, const UInt64 server_protocol_revision) const;
|
void write(WriteBuffer & out, const UInt64 server_protocol_revision) const;
|
||||||
void read(ReadBuffer & in, const UInt64 client_protocol_revision);
|
void read(ReadBuffer & in, const UInt64 client_protocol_revision);
|
||||||
|
|
||||||
|
/// Initialize parameters on client initiating query.
|
||||||
|
void setInitialQuery();
|
||||||
|
|
||||||
|
private:
|
||||||
void fillOSUserHostNameAndVersionInfo();
|
void fillOSUserHostNameAndVersionInfo();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -620,15 +620,13 @@ ConfigurationPtr Context::getUsersConfig()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void Context::setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address, const String & quota_key)
|
void Context::setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address)
|
||||||
{
|
{
|
||||||
auto lock = getLock();
|
auto lock = getLock();
|
||||||
|
|
||||||
client_info.current_user = name;
|
client_info.current_user = name;
|
||||||
client_info.current_password = password;
|
client_info.current_password = password;
|
||||||
client_info.current_address = address;
|
client_info.current_address = address;
|
||||||
if (!quota_key.empty())
|
|
||||||
client_info.quota_key = quota_key;
|
|
||||||
|
|
||||||
auto new_user_id = getAccessControlManager().find<User>(name);
|
auto new_user_id = getAccessControlManager().find<User>(name);
|
||||||
std::shared_ptr<const ContextAccess> new_access;
|
std::shared_ptr<const ContextAccess> new_access;
|
||||||
@ -659,6 +657,12 @@ std::shared_ptr<const User> Context::getUser() const
|
|||||||
return access->getUser();
|
return access->getUser();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Context::setQuotaKey(String quota_key_)
|
||||||
|
{
|
||||||
|
auto lock = getLock();
|
||||||
|
client_info.quota_key = std::move(quota_key_);
|
||||||
|
}
|
||||||
|
|
||||||
String Context::getUserName() const
|
String Context::getUserName() const
|
||||||
{
|
{
|
||||||
auto lock = getLock();
|
auto lock = getLock();
|
||||||
|
@ -228,7 +228,8 @@ public:
|
|||||||
|
|
||||||
/// Sets the current user, checks the password and that the specified host is allowed.
|
/// Sets the current user, checks the password and that the specified host is allowed.
|
||||||
/// Must be called before getClientInfo.
|
/// Must be called before getClientInfo.
|
||||||
void setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address, const String & quota_key);
|
void setUser(const String & name, const String & password, const Poco::Net::SocketAddress & address);
|
||||||
|
void setQuotaKey(String quota_key_);
|
||||||
|
|
||||||
UserPtr getUser() const;
|
UserPtr getUser() const;
|
||||||
String getUserName() const;
|
String getUserName() const;
|
||||||
|
@ -409,6 +409,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
|||||||
|
|
||||||
elem.client_info = context.getClientInfo();
|
elem.client_info = context.getClientInfo();
|
||||||
|
|
||||||
|
std::cerr << "Quota key: " << elem.client_info.quota_key << "\n";
|
||||||
|
|
||||||
bool log_queries = settings.log_queries && !internal;
|
bool log_queries = settings.log_queries && !internal;
|
||||||
|
|
||||||
/// Log into system table start of query execution, if need.
|
/// Log into system table start of query execution, if need.
|
||||||
|
Loading…
Reference in New Issue
Block a user