update tcp protocol, add quota_key

This commit is contained in:
Yakov Olkhovskiy 2022-08-03 15:44:08 -04:00
parent b33fe26d8c
commit 2e34b384c1
20 changed files with 73 additions and 14 deletions

View File

@ -61,7 +61,7 @@ public:
Benchmark(unsigned concurrency_, double delay_,
Strings && hosts_, Ports && ports_, bool round_robin_,
bool cumulative_, bool secure_, const String & default_database_,
const String & user_, const String & password_, const String & stage,
const String & user_, const String & password_, const String & quota_key_, const String & stage,
bool randomize_, size_t max_iterations_, double max_time_,
const String & json_path_, size_t confidence_,
const String & query_id_, const String & query_to_execute_, bool continue_on_errors_,
@ -90,7 +90,7 @@ public:
connections.emplace_back(std::make_unique<ConnectionPool>(
concurrency,
cur_host, cur_port,
default_database_, user_, password_,
default_database_, user_, password_, quota_key_,
/* cluster_= */ "",
/* cluster_secret_= */ "",
/* client_name_= */ "benchmark",
@ -607,6 +607,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
/// So we copy the results to std::string.
std::optional<std::string> env_user_str;
std::optional<std::string> env_password_str;
std::optional<std::string> env_quota_key_str;
const char * env_user = getenv("CLICKHOUSE_USER");
if (env_user != nullptr)
@ -616,6 +617,10 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
if (env_password != nullptr)
env_password_str.emplace(std::string(env_password));
const char * env_quota_key = getenv("CLICKHOUSE_QUOTA_KEY");
if (env_quota_key != nullptr)
env_quota_key_str.emplace(std::string(env_quota_key));
boost::program_options::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth());
desc.add_options()
("help", "produce help message")
@ -634,6 +639,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
("secure,s", "Use TLS connection")
("user,u", value<std::string>()->default_value(env_user_str.value_or("default")), "")
("password", value<std::string>()->default_value(env_password_str.value_or("")), "")
("quota_key", value<std::string>()->default_value(env_quota_key_str.value_or("")), "")
("database", value<std::string>()->default_value("default"), "")
("stacktrace", "print stack traces of exceptions")
("confidence", value<size_t>()->default_value(5), "set the level of confidence for T-test [0=80%, 1=90%, 2=95%, 3=98%, 4=99%, 5=99.5%(default)")
@ -682,6 +688,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
options["database"].as<std::string>(),
options["user"].as<std::string>(),
options["password"].as<std::string>(),
options["quota_key"].as<std::string>(),
options["stage"].as<std::string>(),
options["randomize"].as<bool>(),
options["iterations"].as<size_t>(),

View File

@ -2010,7 +2010,7 @@ UInt64 ClusterCopier::executeQueryOnCluster(
{
connections.emplace_back(std::make_shared<Connection>(
node.host_name, node.port, node.default_database,
node.user, node.password, node.cluster, node.cluster_secret,
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
"ClusterCopier", node.compression, node.secure
));

View File

@ -64,6 +64,7 @@ Connection::~Connection() = default;
Connection::Connection(const String & host_, UInt16 port_,
const String & default_database_,
const String & user_, const String & password_,
const String & quota_key_,
const String & cluster_,
const String & cluster_secret_,
const String & client_name_,
@ -71,7 +72,7 @@ Connection::Connection(const String & host_, UInt16 port_,
Protocol::Secure secure_,
Poco::Timespan sync_request_timeout_)
: host(host_), port(port_), default_database(default_database_)
, user(user_), password(password_)
, user(user_), password(password_), quota_key(quota_key_)
, cluster(cluster_)
, cluster_secret(cluster_secret_)
, client_name(client_name_)
@ -169,6 +170,8 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
sendHello();
receiveHello();
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY)
sendAddendum();
LOG_TRACE(log_wrapper.get(), "Connected to {} server version {}.{}.{}.",
server_name, server_version_major, server_version_minor, server_version_patch);
@ -266,6 +269,13 @@ void Connection::sendHello()
}
void Connection::sendAddendum()
{
writeStringBinary(quota_key, *out);
out->next();
}
void Connection::receiveHello()
{
/// Receive hello packet.
@ -1083,6 +1093,7 @@ ServerConnectionPtr Connection::createConnection(const ConnectionParameters & pa
parameters.default_database,
parameters.user,
parameters.password,
parameters.quota_key,
"", /* cluster */
"", /* cluster_secret */
"client",

View File

@ -51,6 +51,7 @@ public:
Connection(const String & host_, UInt16 port_,
const String & default_database_,
const String & user_, const String & password_,
const String & quota_key_,
const String & cluster_,
const String & cluster_secret_,
const String & client_name_,
@ -159,6 +160,7 @@ private:
String default_database;
String user;
String password;
String quota_key;
/// For inter-server authorization
String cluster;
@ -245,6 +247,7 @@ private:
void connect(const ConnectionTimeouts & timeouts);
void sendHello();
void sendAddendum();
void receiveHello();
#if USE_SSL

View File

@ -58,6 +58,7 @@ ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfigurati
if (auto * result = readpassphrase(prompt.c_str(), buf, sizeof(buf), 0))
password = result;
}
quota_key = config.getString("quota_key", "");
/// By default compression is disabled if address looks like localhost.
compression = config.getBool("compression", !isLocalAddress(DNSResolver::instance().resolveHost(host)))

View File

@ -18,6 +18,7 @@ struct ConnectionParameters
std::string default_database;
std::string user;
std::string password;
std::string quota_key;
Protocol::Secure security = Protocol::Secure::Disable;
Protocol::Compression compression = Protocol::Compression::Enable;
ConnectionTimeouts timeouts;

View File

@ -12,6 +12,7 @@ ConnectionPoolPtr ConnectionPoolFactory::get(
String default_database,
String user,
String password,
String quota_key,
String cluster,
String cluster_secret,
String client_name,
@ -20,7 +21,7 @@ ConnectionPoolPtr ConnectionPoolFactory::get(
Int64 priority)
{
Key key{
max_connections, host, port, default_database, user, password, cluster, cluster_secret, client_name, compression, secure, priority};
max_connections, host, port, default_database, user, password, quota_key, cluster, cluster_secret, client_name, compression, secure, priority};
std::lock_guard lock(mutex);
auto [it, inserted] = pools.emplace(key, ConnectionPoolPtr{});
@ -37,6 +38,7 @@ ConnectionPoolPtr ConnectionPoolFactory::get(
default_database,
user,
password,
quota_key,
cluster,
cluster_secret,
client_name,

View File

@ -54,6 +54,7 @@ public:
const String & default_database_,
const String & user_,
const String & password_,
const String & quota_key_,
const String & cluster_,
const String & cluster_secret_,
const String & client_name_,
@ -67,6 +68,7 @@ public:
default_database(default_database_),
user(user_),
password(password_),
quota_key(quota_key_),
cluster(cluster_),
cluster_secret(cluster_secret_),
client_name(client_name_),
@ -112,7 +114,7 @@ protected:
{
return std::make_shared<Connection>(
host, port,
default_database, user, password,
default_database, user, password, quota_key,
cluster, cluster_secret,
client_name, compression, secure);
}
@ -123,6 +125,7 @@ private:
String default_database;
String user;
String password;
String quota_key;
/// For inter-server authorization
String cluster;
@ -149,6 +152,7 @@ public:
String default_database;
String user;
String password;
String quota_key;
String cluster;
String cluster_secret;
String client_name;
@ -171,6 +175,7 @@ public:
String default_database,
String user,
String password,
String quota_key,
String cluster,
String cluster_secret,
String client_name,

View File

@ -52,10 +52,12 @@
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
#define DBMS_TCP_PROTOCOL_VERSION 54457
#define DBMS_TCP_PROTOCOL_VERSION 54458
#define DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME 54449
#define DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT 54456
#define DBMS_MIN_PROTOCOL_VERSION_WITH_VIEW_IF_PERMITTED 54457
#define DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY 54458

View File

@ -29,7 +29,7 @@ namespace ErrorCodes
}
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password", "db", "database", "table",
"host", "port", "user", "password", "quota_key", "db", "database", "table",
"update_field", "update_lag", "invalidate_query", "query", "where", "name", "secure"};
namespace
@ -54,6 +54,7 @@ namespace
configuration.db,
configuration.user,
configuration.password,
configuration.quota_key,
"", /* cluster */
"", /* cluster_secret */
"ClickHouseDictionarySource",
@ -237,6 +238,7 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
std::string host = config.getString(settings_config_prefix + ".host", "localhost");
std::string user = config.getString(settings_config_prefix + ".user", "default");
std::string password = config.getString(settings_config_prefix + ".password", "");
std::string quota_key = config.getString(settings_config_prefix + ".quota_key", "");
std::string db = config.getString(settings_config_prefix + ".db", default_database);
std::string table = config.getString(settings_config_prefix + ".table", "");
UInt16 port = static_cast<UInt16>(config.getUInt(settings_config_prefix + ".port", default_port));
@ -252,6 +254,7 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
host = configuration.host;
user = configuration.username;
password = configuration.password;
quota_key = configuration.quota_key;
db = configuration.database;
table = configuration.table;
port = configuration.port;
@ -261,6 +264,7 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
.host = host,
.user = user,
.password = password,
.quota_key = quota_key,
.db = db,
.table = table,
.query = config.getString(settings_config_prefix + ".query", ""),

View File

@ -23,6 +23,7 @@ public:
const std::string host;
const std::string user;
const std::string password;
const std::string quota_key;
const std::string db;
const std::string table;
const std::string query;

View File

@ -425,7 +425,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
auto pool = ConnectionPoolFactory::instance().get(
settings.distributed_connections_pool_size,
address.host_name, address.port,
address.default_database, address.user, address.password,
address.default_database, address.user, address.password, address.quota_key,
address.cluster, address.cluster_secret,
"server", address.compression,
address.secure, address.priority);
@ -499,7 +499,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
auto replica_pool = ConnectionPoolFactory::instance().get(
settings.distributed_connections_pool_size,
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password,
replica.default_database, replica.user, replica.password, replica.quota_key,
replica.cluster, replica.cluster_secret,
"server", replica.compression,
replica.secure, replica.priority);
@ -587,7 +587,7 @@ Cluster::Cluster(
auto replica_pool = ConnectionPoolFactory::instance().get(
settings.distributed_connections_pool_size,
replica.host_name, replica.port,
replica.default_database, replica.user, replica.password,
replica.default_database, replica.user, replica.password, replica.quota_key,
replica.cluster, replica.cluster_secret,
"server", replica.compression, replica.secure, replica.priority);
all_replicas.emplace_back(replica_pool);
@ -699,6 +699,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
address.default_database,
address.user,
address.password,
address.quota_key,
address.cluster,
address.cluster_secret,
"server",

View File

@ -93,6 +93,7 @@ public:
UInt16 port{0};
String user;
String password;
String quota_key;
/// For inter-server authorization
String cluster;

View File

@ -134,6 +134,8 @@ void TCPHandler::runImpl()
{
receiveHello();
sendHello();
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY)
receiveAddendum();
if (!is_interserver_mode) /// In interserver mode queries are executed without a session context.
{
@ -1019,6 +1021,7 @@ std::unique_ptr<Session> TCPHandler::makeSession()
client_info.connection_client_version_patch = client_version_patch;
client_info.connection_tcp_protocol_version = client_tcp_protocol_version;
client_info.quota_key = quota_key;
client_info.interface = interface;
return res;
@ -1077,6 +1080,12 @@ void TCPHandler::receiveHello()
session->authenticate(user, password, socket().peerAddress());
}
void TCPHandler::receiveAddendum()
{
readStringBinary(quota_key, *in);
session->getClientInfo().quota_key = quota_key;
}
void TCPHandler::receiveUnexpectedHello()
{

View File

@ -155,6 +155,7 @@ private:
UInt64 client_version_minor = 0;
UInt64 client_version_patch = 0;
UInt64 client_tcp_protocol_version = 0;
String quota_key;
/// Connection settings, which are extracted from a context.
bool send_exception_with_stack_trace = true;
@ -211,6 +212,7 @@ private:
bool receiveProxyHeader();
void receiveHello();
void receiveAddendum();
bool receivePacket();
void receiveQuery();
void receiveIgnoredPartUUIDs();

View File

@ -538,6 +538,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
address.default_database,
address.user,
address.password,
address.quota_key,
address.cluster,
address.cluster_secret,
storage.getName() + '_' + address.user, /* client */

View File

@ -35,7 +35,7 @@ namespace ErrorCodes
IMPLEMENT_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS)
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password", "db",
"host", "port", "user", "password", "quota_key", "db",
"database", "table", "schema", "replica",
"update_field", "update_lag", "invalidate_query", "query",
"where", "name", "secure", "uri", "collection"};
@ -84,6 +84,7 @@ void ExternalDataSourceConfiguration::set(const ExternalDataSourceConfiguration
port = conf.port;
username = conf.username;
password = conf.password;
quota_key = conf.quota_key;
database = conf.database;
table = conf.table;
schema = conf.schema;
@ -123,6 +124,7 @@ std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
configuration.port = config.getInt(collection_prefix + ".port", 0);
configuration.username = config.getString(collection_prefix + ".user", "");
configuration.password = config.getString(collection_prefix + ".password", "");
configuration.quota_key = config.getString(collection_prefix + ".quota_key", "");
configuration.database = config.getString(collection_prefix + ".database", "");
configuration.table = config.getString(collection_prefix + ".table", config.getString(collection_prefix + ".collection", ""));
configuration.schema = config.getString(collection_prefix + ".schema", "");
@ -169,6 +171,8 @@ std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
configuration.username = arg_value.safeGet<String>();
else if (arg_name == "password")
configuration.password = arg_value.safeGet<String>();
else if (arg_name == "quota_key")
configuration.quota_key = arg_value.safeGet<String>();
else if (arg_name == "database")
configuration.database = arg_value.safeGet<String>();
else if (arg_name == "table")
@ -236,6 +240,7 @@ std::optional<ExternalDataSourceInfo> getExternalDataSourceConfiguration(
configuration.port = dict_config.getInt(dict_config_prefix + ".port", config.getUInt(collection_prefix + ".port", 0));
configuration.username = dict_config.getString(dict_config_prefix + ".user", config.getString(collection_prefix + ".user", ""));
configuration.password = dict_config.getString(dict_config_prefix + ".password", config.getString(collection_prefix + ".password", ""));
configuration.quota_key = dict_config.getString(dict_config_prefix + ".quota_key", config.getString(collection_prefix + ".quota_key", ""));
configuration.database = dict_config.getString(dict_config_prefix + ".db", config.getString(dict_config_prefix + ".database",
config.getString(collection_prefix + ".db", config.getString(collection_prefix + ".database", ""))));
configuration.table = dict_config.getString(dict_config_prefix + ".table", config.getString(collection_prefix + ".table", ""));
@ -328,6 +333,7 @@ ExternalDataSourcesByPriority getExternalDataSourceConfigurationByPriority(
common_configuration.port = dict_config.getUInt(dict_config_prefix + ".port", 0);
common_configuration.username = dict_config.getString(dict_config_prefix + ".user", "");
common_configuration.password = dict_config.getString(dict_config_prefix + ".password", "");
common_configuration.quota_key = dict_config.getString(dict_config_prefix + ".quota_key", "");
common_configuration.database = dict_config.getString(dict_config_prefix + ".db", dict_config.getString(dict_config_prefix + ".database", ""));
common_configuration.table = dict_config.getString(fmt::format("{}.table", dict_config_prefix), "");
common_configuration.schema = dict_config.getString(fmt::format("{}.schema", dict_config_prefix), "");
@ -359,6 +365,7 @@ ExternalDataSourcesByPriority getExternalDataSourceConfigurationByPriority(
replica_configuration.port = dict_config.getUInt(replica_name + ".port", common_configuration.port);
replica_configuration.username = dict_config.getString(replica_name + ".user", common_configuration.username);
replica_configuration.password = dict_config.getString(replica_name + ".password", common_configuration.password);
replica_configuration.quota_key = dict_config.getString(replica_name + ".quota_key", common_configuration.quota_key);
if (replica_configuration.host.empty() || replica_configuration.port == 0
|| replica_configuration.username.empty() || replica_configuration.password.empty())

View File

@ -19,6 +19,7 @@ struct ExternalDataSourceConfiguration
UInt16 port = 0;
String username = "default";
String password;
String quota_key;
String database;
String table;
String schema;

View File

@ -99,7 +99,7 @@ Pipe StorageHDFSCluster::read(
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, node.cluster, node.cluster_secret,
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
"HDFSClusterInititiator",
node.compression,
node.secure

View File

@ -129,7 +129,7 @@ Pipe StorageS3Cluster::read(
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, node.cluster, node.cluster_secret,
node.user, node.password, node.quota_key, node.cluster, node.cluster_secret,
"S3ClusterInititiator",
node.compression,
node.secure