add config parameters and client arguments, make default notchunked_optional

This commit is contained in:
Yakov Olkhovskiy 2024-05-23 22:01:32 +00:00
parent 99bd796011
commit 94bc0a1e96
16 changed files with 240 additions and 10 deletions

View File

@ -75,6 +75,8 @@ public:
const String & default_database_,
const String & user_,
const String & password_,
const String & proto_send_chunked_,
const String & proto_recv_chunked_,
const String & quota_key_,
const String & stage,
bool randomize_,
@ -128,7 +130,9 @@ public:
connections.emplace_back(std::make_unique<ConnectionPool>(
concurrency,
cur_host, cur_port,
default_database_, user_, password_, quota_key_,
default_database_, user_, password_,
proto_send_chunked_, proto_recv_chunked_,
quota_key_,
/* cluster_= */ "",
/* cluster_secret_= */ "",
/* client_name_= */ std::string(DEFAULT_CLIENT_NAME),
@ -662,6 +666,50 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
Strings hosts = options.count("host") ? options["host"].as<Strings>() : Strings({"localhost"});
String proto_send_chunked {"notchunked_optional"};
String proto_recv_chunked {"notchunked_optional"};
if (options.count("proto_caps"))
{
std::string proto_caps_str = options["proto_caps"].as<std::string>();
std::vector<std::string_view> proto_caps;
splitInto<','>(proto_caps, proto_caps_str);
for (auto cap_str : proto_caps)
{
std::string direction;
if (cap_str.starts_with("send_"))
{
direction = "send";
cap_str = cap_str.substr(std::string_view("send_").size());
}
else if (cap_str.starts_with("recv_"))
{
direction = "recv";
cap_str = cap_str.substr(std::string_view("recv_").size());
}
if (cap_str != "chunked" && cap_str != "notchunked" && cap_str != "chunked_optional" && cap_str != "notchunked_optional")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "proto_caps option is incorrect ({})", proto_caps_str);
if (direction.empty())
{
proto_send_chunked = cap_str;
proto_recv_chunked = cap_str;
}
else
{
if (direction == "send")
proto_send_chunked = cap_str;
else
proto_recv_chunked = cap_str;
}
}
}
Benchmark benchmark(
options["concurrency"].as<unsigned>(),
options["delay"].as<double>(),
@ -673,6 +721,8 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
options["database"].as<std::string>(),
options["user"].as<std::string>(),
options["password"].as<std::string>(),
proto_send_chunked,
proto_recv_chunked,
options["quota_key"].as<std::string>(),
options["stage"].as<std::string>(),
options.count("randomize"),

View File

@ -75,9 +75,11 @@
#include <limits>
#include <map>
#include <memory>
#include <string_view>
#include <unordered_map>
#include <Common/config_version.h>
#include <base/find_symbols.h>
#include "config.h"
namespace fs = std::filesystem;
@ -2993,6 +2995,8 @@ void ClientBase::init(int argc, char ** argv)
("config-file,C", po::value<std::string>(), "config-file path")
("proto_caps", po::value<std::string>(), "enable/disable chunked protocol: chunked_optional, notchunked, notchunked_optional, send_chunked, send_chunked_optional, send_notchunked, send_notchunked_optional, recv_chunked, recv_chunked_optional, recv_notchunked, recv_notchunked_optional")
("query,q", po::value<std::vector<std::string>>()->multitoken(), R"(query; can be specified multiple times (--query "SELECT 1" --query "SELECT 2"...))")
("queries-file", po::value<std::vector<std::string>>()->multitoken(), "file path with queries to execute; multiple files can be specified (--queries-file file1 file2...)")
("multiquery,n", "If specified, multiple queries separated by semicolons can be listed after --query. For convenience, it is also possible to omit --query and pass the queries directly after --multiquery.")
@ -3162,6 +3166,41 @@ void ClientBase::init(int argc, char ** argv)
if (options.count("server_logs_file"))
server_logs_file = options["server_logs_file"].as<std::string>();
if (options.count("proto_caps"))
{
std::string proto_caps_str = options["proto_caps"].as<std::string>();
std::vector<std::string_view> proto_caps;
splitInto<','>(proto_caps, proto_caps_str);
for (auto cap_str : proto_caps)
{
std::string direction;
if (cap_str.starts_with("send_"))
{
direction = "send";
cap_str = cap_str.substr(std::string_view("send_").size());
}
else if (cap_str.starts_with("recv_"))
{
direction = "recv";
cap_str = cap_str.substr(std::string_view("recv_").size());
}
if (cap_str != "chunked" && cap_str != "notchunked" && cap_str != "chunked_optional" && cap_str != "notchunked_optional")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "proto_caps option is incorrect ({})", proto_caps_str);
if (direction.empty())
{
config().setString("proto_caps.send", std::string(cap_str));
config().setString("proto_caps.recv", std::string(cap_str));
}
else
config().setString("proto_caps." + direction, std::string(cap_str));
}
}
query_processing_stage = QueryProcessingStage::fromString(options["stage"].as<std::string>());
query_kind = parseQueryKind(options["query_kind"].as<std::string>());
profile_events.print = options.count("print-profile-events");

View File

@ -71,6 +71,7 @@ Connection::~Connection() = default;
Connection::Connection(const String & host_, UInt16 port_,
const String & default_database_,
const String & user_, const String & password_,
const String & proto_send_chunked_, const String & proto_recv_chunked_,
[[maybe_unused]] const SSHKey & ssh_private_key_,
const String & quota_key_,
const String & cluster_,
@ -80,6 +81,7 @@ Connection::Connection(const String & host_, UInt16 port_,
Protocol::Secure secure_)
: host(host_), port(port_), default_database(default_database_)
, user(user_), password(password_)
, proto_send_chunked(proto_send_chunked_), proto_recv_chunked(proto_recv_chunked_)
#if USE_SSH
, ssh_private_key(ssh_private_key_)
#endif
@ -206,13 +208,46 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
sendHello();
receiveHello(timeouts.handshake_timeout);
bool out_chunked = false;
bool in_chunked = false;
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS)
{
auto is_chunked = [](const String & chunked_srv_str, const String & chunked_cl_str, const String & direction)
{
bool chunked_srv = chunked_srv_str.starts_with("chunked");
bool optional_srv = chunked_srv_str.ends_with("_optional");
bool chunked_cl = chunked_cl_str.starts_with("chunked");
bool optional_cl = chunked_cl_str.ends_with("_optional");
if (optional_srv)
return chunked_cl;
if (optional_cl)
return chunked_srv;
if (chunked_cl != chunked_srv)
throw NetException(
ErrorCodes::NETWORK_ERROR,
"Incompatible protocol: {} set to {}, server requires {}",
direction,
chunked_cl ? "chunked" : "notchunked",
chunked_srv ? "chunked" : "notchunked");
return chunked_srv;
};
out_chunked = is_chunked(proto_recv_chunked_srv, proto_send_chunked, "send");
in_chunked = is_chunked(proto_send_chunked_srv, proto_recv_chunked, "recv");
}
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM)
sendAddendum();
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS)
{
in->enableChunked();
out->enableChunked();
if (out_chunked)
out->enableChunked();
if (in_chunked)
in->enableChunked();
}
LOG_TRACE(log_wrapper.get(), "Connected to {} server version {}.{}.{}.",
@ -359,6 +394,13 @@ void Connection::sendAddendum()
{
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY)
writeStringBinary(quota_key, *out);
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS)
{
writeStringBinary(proto_send_chunked, *out);
writeStringBinary(proto_recv_chunked, *out);
}
out->next();
}
@ -438,6 +480,12 @@ void Connection::receiveHello(const Poco::Timespan & handshake_timeout)
else
server_version_patch = server_revision;
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS)
{
readStringBinary(proto_send_chunked_srv, *in);
readStringBinary(proto_recv_chunked_srv, *in);
}
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_PASSWORD_COMPLEXITY_RULES)
{
UInt64 rules_size;
@ -1327,6 +1375,8 @@ ServerConnectionPtr Connection::createConnection(const ConnectionParameters & pa
parameters.default_database,
parameters.user,
parameters.password,
parameters.proto_send_chunked,
parameters.proto_recv_chunked,
parameters.ssh_private_key,
parameters.quota_key,
"", /* cluster */

View File

@ -52,6 +52,7 @@ public:
Connection(const String & host_, UInt16 port_,
const String & default_database_,
const String & user_, const String & password_,
const String & proto_send_chunked_, const String & proto_recv_chunked_,
const SSHKey & ssh_private_key_,
const String & quota_key_,
const String & cluster_,
@ -169,6 +170,10 @@ private:
String default_database;
String user;
String password;
String proto_send_chunked;
String proto_recv_chunked;
String proto_send_chunked_srv;
String proto_recv_chunked_srv;
#if USE_SSH
SSHKey ssh_private_key;
#endif

View File

@ -103,6 +103,9 @@ ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfigurati
#endif
}
proto_send_chunked = config.getString("proto_caps.send", "notchunked_optional");
proto_recv_chunked = config.getString("proto_caps.recv", "notchunked_optional");
quota_key = config.getString("quota_key", "");
/// By default compression is disabled if address looks like localhost.

View File

@ -20,6 +20,8 @@ struct ConnectionParameters
std::string default_database;
std::string user;
std::string password;
std::string proto_send_chunked;
std::string proto_recv_chunked;
std::string quota_key;
SSHKey ssh_private_key;
Protocol::Secure security = Protocol::Secure::Disable;

View File

@ -12,6 +12,8 @@ ConnectionPoolPtr ConnectionPoolFactory::get(
String default_database,
String user,
String password,
String proto_send_chunked,
String proto_recv_chunked,
String quota_key,
String cluster,
String cluster_secret,
@ -21,7 +23,7 @@ ConnectionPoolPtr ConnectionPoolFactory::get(
Priority priority)
{
Key key{
max_connections, host, port, default_database, user, password, quota_key, cluster, cluster_secret, client_name, compression, secure, priority};
max_connections, host, port, default_database, user, password, proto_send_chunked, proto_recv_chunked, quota_key, cluster, cluster_secret, client_name, compression, secure, priority};
std::lock_guard lock(mutex);
auto [it, inserted] = pools.emplace(key, ConnectionPoolPtr{});
@ -38,6 +40,8 @@ ConnectionPoolPtr ConnectionPoolFactory::get(
default_database,
user,
password,
proto_send_chunked,
proto_recv_chunked,
quota_key,
cluster,
cluster_secret,

View File

@ -72,6 +72,8 @@ public:
const String & default_database_,
const String & user_,
const String & password_,
const String & proto_send_chunked_,
const String & proto_recv_chunked_,
const String & quota_key_,
const String & cluster_,
const String & cluster_secret_,
@ -84,6 +86,8 @@ public:
, default_database(default_database_)
, user(user_)
, password(password_)
, proto_send_chunked(proto_send_chunked_)
, proto_recv_chunked(proto_recv_chunked_)
, quota_key(quota_key_)
, cluster(cluster_)
, cluster_secret(cluster_secret_)
@ -123,7 +127,9 @@ protected:
{
return std::make_shared<Connection>(
host, port,
default_database, user, password, SSHKey(), quota_key,
default_database, user, password,
proto_send_chunked, proto_recv_chunked,
SSHKey(), quota_key,
cluster, cluster_secret,
client_name, compression, secure);
}
@ -132,6 +138,8 @@ private:
String default_database;
String user;
String password;
String proto_send_chunked;
String proto_recv_chunked;
String quota_key;
/// For inter-server authorization
@ -157,6 +165,8 @@ public:
String default_database;
String user;
String password;
String proto_send_chunked;
String proto_recv_chunked;
String quota_key;
String cluster;
String cluster_secret;
@ -180,6 +190,8 @@ public:
String default_database,
String user,
String password,
String proto_send_chunked,
String proto_recv_chunked,
String quota_key,
String cluster,
String cluster_secret,
@ -197,6 +209,7 @@ inline bool operator==(const ConnectionPoolFactory::Key & lhs, const ConnectionP
{
return lhs.max_connections == rhs.max_connections && lhs.host == rhs.host && lhs.port == rhs.port
&& lhs.default_database == rhs.default_database && lhs.user == rhs.user && lhs.password == rhs.password
&& lhs.proto_send_chunked == rhs.proto_send_chunked && lhs.proto_recv_chunked == rhs.proto_recv_chunked
&& lhs.quota_key == rhs.quota_key
&& lhs.cluster == rhs.cluster && lhs.cluster_secret == rhs.cluster_secret && lhs.client_name == rhs.client_name
&& lhs.compression == rhs.compression && lhs.secure == rhs.secure && lhs.priority == rhs.priority;

View File

@ -51,6 +51,8 @@ namespace
configuration.db,
configuration.user,
configuration.password,
configuration.proto_send_chunked,
configuration.proto_recv_chunked,
configuration.quota_key,
"", /* cluster */
"", /* cluster_secret */
@ -222,7 +224,7 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
{
validateNamedCollection(
*named_collection, {}, ValidateKeysMultiset<ExternalDatabaseEqualKeysSet>{
"secure", "host", "hostname", "port", "user", "username", "password", "quota_key", "name",
"secure", "host", "hostname", "port", "user", "username", "password", "proto_send_chunked", "proto_recv_chunked", "quota_key", "name",
"db", "database", "table","query", "where", "invalidate_query", "update_field", "update_lag"});
const auto secure = named_collection->getOrDefault("secure", false);
@ -234,6 +236,8 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
.host = host,
.user = named_collection->getAnyOrDefault<String>({"user", "username"}, "default"),
.password = named_collection->getOrDefault<String>("password", ""),
.proto_send_chunked = named_collection->getOrDefault<String>("proto_send_chunked", "notchunked_optional"),
.proto_recv_chunked = named_collection->getOrDefault<String>("proto_recv_chunked", "notchunked_optional"),
.quota_key = named_collection->getOrDefault<String>("quota_key", ""),
.db = named_collection->getAnyOrDefault<String>({"db", "database"}, default_database),
.table = named_collection->getOrDefault<String>("table", ""),
@ -258,6 +262,8 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
.host = host,
.user = config.getString(settings_config_prefix + ".user", "default"),
.password = config.getString(settings_config_prefix + ".password", ""),
.proto_send_chunked = config.getString(settings_config_prefix + ".proto_caps.send", "notchunked_optional"),
.proto_recv_chunked = config.getString(settings_config_prefix + ".proto_caps.recv", "notchunked_optional"),
.quota_key = config.getString(settings_config_prefix + ".quota_key", ""),
.db = config.getString(settings_config_prefix + ".db", default_database),
.table = config.getString(settings_config_prefix + ".table", ""),

View File

@ -23,6 +23,8 @@ public:
const std::string host;
const std::string user;
const std::string password;
const std::string proto_send_chunked;
const std::string proto_recv_chunked;
const std::string quota_key;
const std::string db;
const std::string table;

View File

@ -113,6 +113,9 @@ Cluster::Address::Address(
secure = ConfigHelper::getBool(config, config_prefix + ".secure", false, /* empty_as */true) ? Protocol::Secure::Enable : Protocol::Secure::Disable;
priority = Priority{config.getInt(config_prefix + ".priority", 1)};
proto_send_chunked = config.getString(config_prefix + ".proto_caps.send", "notchunked_optional");
proto_recv_chunked = config.getString(config_prefix + ".proto_caps.recv", "notchunked_optional");
const char * port_type = secure == Protocol::Secure::Enable ? "tcp_port_secure" : "tcp_port";
auto default_port = config.getInt(port_type, 0);
@ -425,7 +428,9 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
auto pool = ConnectionPoolFactory::instance().get(
static_cast<unsigned>(settings.distributed_connections_pool_size),
address.host_name, address.port,
address.default_database, address.user, address.password, address.quota_key,
address.default_database, address.user, address.password,
address.proto_send_chunked, address.proto_recv_chunked,
address.quota_key,
address.cluster, address.cluster_secret,
"server", address.compression,
address.secure, address.priority);
@ -589,6 +594,8 @@ void Cluster::addShard(
replica.default_database,
replica.user,
replica.password,
replica.proto_send_chunked,
replica.proto_recv_chunked,
replica.quota_key,
replica.cluster,
replica.cluster_secret,
@ -744,6 +751,8 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
address.default_database,
address.user,
address.password,
address.proto_send_chunked,
address.proto_recv_chunked,
address.quota_key,
address.cluster,
address.cluster_secret,

View File

@ -114,6 +114,8 @@ public:
UInt16 port{0};
String user;
String password;
String proto_send_chunked;
String proto_recv_chunked;
String quota_key;
/// For inter-server authorization

View File

@ -1,6 +1,7 @@
#include "Interpreters/AsynchronousInsertQueue.h"
#include "Interpreters/SquashingTransform.h"
#include "Parsers/ASTInsertQuery.h"
#include <base/find_symbols.h>
#include <algorithm>
#include <exception>
#include <iterator>
@ -99,6 +100,7 @@ namespace DB::ErrorCodes
extern const int SUPPORT_IS_DISABLED;
extern const int UNSUPPORTED_METHOD;
extern const int USER_EXPIRED;
extern const int NETWORK_ERROR;
}
namespace
@ -279,8 +281,35 @@ void TCPHandler::runImpl()
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS)
{
in->enableChunked();
out->enableChunked();
auto is_chunked = [](const String & chunked_srv_str, const String & chunked_cl_str, const String & direction)
{
bool chunked_srv = chunked_srv_str.starts_with("chunked");
bool optional_srv = chunked_srv_str.ends_with("_optional");
bool chunked_cl = chunked_cl_str.starts_with("chunked");
bool optional_cl = chunked_cl_str.ends_with("_optional");
if (optional_srv)
return chunked_cl;
if (optional_cl)
return chunked_srv;
if (chunked_cl != chunked_srv)
throw NetException(
ErrorCodes::NETWORK_ERROR,
"Incompatible protocol: {} is {}, client requested {}",
direction,
chunked_srv ? "chunked" : "notchunked",
chunked_cl ? "chunked" : "notchunked");
return chunked_srv;
};
bool out_chunked = is_chunked(server.config().getString("proto_caps.send", "notchunked_optional"), proto_recv_chunked_cl, "send");
bool in_chunked = is_chunked(server.config().getString("proto_caps.recv", "notchunked_optional"), proto_send_chunked_cl, "recv");
if (out_chunked)
out->enableChunked();
if (in_chunked)
in->enableChunked();
}
if (!is_interserver_mode)
@ -1575,6 +1604,12 @@ void TCPHandler::receiveAddendum()
if (!is_interserver_mode)
session->setQuotaClientKey(quota_key);
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS)
{
readStringBinary(proto_send_chunked_cl, *in);
readStringBinary(proto_recv_chunked_cl, *in);
}
}
@ -1608,6 +1643,11 @@ void TCPHandler::sendHello()
writeStringBinary(server_display_name, *out);
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)
writeVarUInt(VERSION_PATCH, *out);
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS)
{
writeStringBinary(server.config().getString("proto_caps.send", "notchunked_optional"), *out);
writeStringBinary(server.config().getString("proto_caps.recv", "notchunked_optional"), *out);
}
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_PASSWORD_COMPLEXITY_RULES)
{
auto rules = server.context()->getAccessControl().getPasswordComplexityRules();

View File

@ -188,6 +188,8 @@ private:
UInt64 client_version_minor = 0;
UInt64 client_version_patch = 0;
UInt32 client_tcp_protocol_version = 0;
String proto_send_chunked_cl;
String proto_recv_chunked_cl;
String quota_key;
/// Connection settings, which are extracted from a context.

View File

@ -273,6 +273,8 @@ ConnectionPoolWithFailoverPtr DistributedAsyncInsertDirectoryQueue::createPool(c
address.default_database,
address.user,
address.password,
address.proto_send_chunked,
address.proto_recv_chunked,
address.quota_key,
address.cluster,
address.cluster_secret,

View File

@ -5664,7 +5664,8 @@ std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClu
{
auto connection = std::make_shared<Connection>(
node.host_name, node.port, query_context->getGlobalContext()->getCurrentDatabase(),
node.user, node.password, SSHKey(), node.quota_key, node.cluster, node.cluster_secret,
node.user, node.password, node.proto_send_chunked, node.proto_recv_chunked,
SSHKey(), node.quota_key, node.cluster, node.cluster_secret,
"ParallelInsertSelectInititiator",
node.compression,
node.secure