Merge pull request #34490 from Avogar/fix-client

Refactor client fault tolerant connection
This commit is contained in:
Kruglov Pavel 2022-02-16 12:39:46 +03:00 committed by GitHub
commit adf58ea1ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 165 additions and 100 deletions

View File

@ -481,29 +481,25 @@ catch (...)
void Client::connect()
{
UInt16 default_port = ConnectionParameters::getPortFromConfig(config());
connection_parameters = ConnectionParameters(config(), hosts_ports[0].host,
hosts_ports[0].port.value_or(default_port));
String server_name;
UInt64 server_version_major = 0;
UInt64 server_version_minor = 0;
UInt64 server_version_patch = 0;
for (size_t attempted_address_index = 0; attempted_address_index < hosts_ports.size(); ++attempted_address_index)
for (size_t attempted_address_index = 0; attempted_address_index < hosts_and_ports.size(); ++attempted_address_index)
{
connection_parameters.host = hosts_ports[attempted_address_index].host;
connection_parameters.port = hosts_ports[attempted_address_index].port.value_or(default_port);
if (is_interactive)
std::cout << "Connecting to "
<< (!connection_parameters.default_database.empty() ? "database " + connection_parameters.default_database + " at "
: "")
<< connection_parameters.host << ":" << connection_parameters.port
<< (!connection_parameters.user.empty() ? " as user " + connection_parameters.user : "") << "." << std::endl;
try
{
connection_parameters
= ConnectionParameters(config(), hosts_and_ports[attempted_address_index].host, hosts_and_ports[attempted_address_index].port);
if (is_interactive)
std::cout << "Connecting to "
<< (!connection_parameters.default_database.empty() ? "database " + connection_parameters.default_database + " at "
: "")
<< connection_parameters.host << ":" << connection_parameters.port
<< (!connection_parameters.user.empty() ? " as user " + connection_parameters.user : "") << "." << std::endl;
connection = Connection::createConnection(connection_parameters, global_context);
if (max_client_network_bandwidth)
@ -535,7 +531,7 @@ void Client::connect()
}
else
{
if (attempted_address_index == hosts_ports.size() - 1)
if (attempted_address_index == hosts_and_ports.size() - 1)
throw;
if (is_interactive)
@ -994,11 +990,6 @@ void Client::addOptions(OptionsDescription & options_description)
/// Main commandline options related to client functionality and all parameters from Settings.
options_description.main_description->add_options()
("config,c", po::value<std::string>(), "config-file path (another shorthand)")
("host,h", po::value<std::vector<HostPort>>()->multitoken()->default_value({{"localhost"}}, "localhost"),
"list of server hosts with optionally assigned port to connect. List elements are separated by a space."
"Every list element looks like '<host>[:<port>]'. If port isn't assigned, connection is made by port from '--port' param"
"Example of usage: '-h host1:1 host2 host3:3'")
("port", po::value<int>()->default_value(9000), "server port, which is default port for every host from '--host' param")
("secure,s", "Use TLS connection")
("user,u", po::value<std::string>()->default_value("default"), "user")
/** If "--password [value]" is used but the value is omitted, the bad argument exception will be thrown.
@ -1044,12 +1035,24 @@ void Client::addOptions(OptionsDescription & options_description)
(
"types", po::value<std::string>(), "types"
);
/// Commandline options related to hosts and ports.
options_description.hosts_and_ports_description.emplace(createOptionsDescription("Hosts and ports options", terminal_width));
options_description.hosts_and_ports_description->add_options()
("host,h", po::value<String>()->default_value("localhost"),
"Server hostname. Multiple hosts can be passed via multiple arguments"
"Example of usage: '--host host1 --host host2 --port port2 --host host3 ...'"
"Each '--port port' will be attached to the last seen host that doesn't have a port yet,"
"if there is no such host, the port will be attached to the next first host or to default host.")
("port", po::value<UInt16>()->default_value(DBMS_DEFAULT_PORT), "server ports")
;
}
void Client::processOptions(const OptionsDescription & options_description,
const CommandLineOptions & options,
const std::vector<Arguments> & external_tables_arguments)
const std::vector<Arguments> & external_tables_arguments,
const std::vector<Arguments> & hosts_and_ports_arguments)
{
namespace po = boost::program_options;
@ -1081,6 +1084,25 @@ void Client::processOptions(const OptionsDescription & options_description,
exit(exit_code);
}
}
if (hosts_and_ports_arguments.empty())
{
hosts_and_ports.emplace_back(HostAndPort{"localhost", DBMS_DEFAULT_PORT});
}
else
{
for (const auto & hosts_and_ports_argument : hosts_and_ports_arguments)
{
/// Parse commandline options related to external tables.
po::parsed_options parsed_hosts_and_ports
= po::command_line_parser(hosts_and_ports_argument).options(options_description.hosts_and_ports_description.value()).run();
po::variables_map host_and_port_options;
po::store(parsed_hosts_and_ports, host_and_port_options);
hosts_and_ports.emplace_back(
HostAndPort{host_and_port_options["host"].as<std::string>(), host_and_port_options["port"].as<UInt16>()});
}
}
send_external_tables = true;
shared_context = Context::createShared();
@ -1105,12 +1127,8 @@ void Client::processOptions(const OptionsDescription & options_description,
if (options.count("config"))
config().setString("config-file", options["config"].as<std::string>());
if (options.count("host"))
hosts_ports = options["host"].as<std::vector<HostPort>>();
if (options.count("interleave-queries-file"))
interleave_queries_files = options["interleave-queries-file"].as<std::vector<std::string>>();
if (options.count("port") && !options["port"].defaulted())
config().setInt("port", options["port"].as<int>());
if (options.count("secure"))
config().setBool("secure", true);
if (options.count("user") && !options["user"].defaulted())

View File

@ -25,8 +25,11 @@ protected:
void printHelpMessage(const OptionsDescription & options_description) override;
void addOptions(OptionsDescription & options_description) override;
void processOptions(const OptionsDescription & options_description, const CommandLineOptions & options,
const std::vector<Arguments> & external_tables_arguments) override;
void processOptions(
const OptionsDescription & options_description,
const CommandLineOptions & options,
const std::vector<Arguments> & external_tables_arguments,
const std::vector<Arguments> & hosts_and_ports_arguments) override;
void processConfig() override;
private:

View File

@ -775,7 +775,7 @@ void LocalServer::applyCmdOptions(ContextMutablePtr context)
}
void LocalServer::processOptions(const OptionsDescription &, const CommandLineOptions & options, const std::vector<Arguments> &)
void LocalServer::processOptions(const OptionsDescription &, const CommandLineOptions & options, const std::vector<Arguments> &, const std::vector<Arguments> &)
{
if (options.count("table"))
config().setString("table-name", options["table"].as<std::string>());

View File

@ -41,7 +41,7 @@ protected:
void addOptions(OptionsDescription & options_description) override;
void processOptions(const OptionsDescription & options_description, const CommandLineOptions & options,
const std::vector<Arguments> &) override;
const std::vector<Arguments> &, const std::vector<Arguments> &) override;
void processConfig() override;
private:

View File

@ -1721,7 +1721,12 @@ void ClientBase::showClientVersion()
}
void ClientBase::readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> & external_tables_arguments)
void ClientBase::readArguments(
int argc,
char ** argv,
Arguments & common_arguments,
std::vector<Arguments> & external_tables_arguments,
std::vector<Arguments> & hosts_and_ports_arguments)
{
/** We allow different groups of arguments:
* - common arguments;
@ -1732,6 +1737,10 @@ void ClientBase::readArguments(int argc, char ** argv, Arguments & common_argume
*/
bool in_external_group = false;
std::string prev_host_arg;
std::string prev_port_arg;
for (int arg_num = 1; arg_num < argc; ++arg_num)
{
const char * arg = argv[arg_num];
@ -1792,10 +1801,74 @@ void ClientBase::readArguments(int argc, char ** argv, Arguments & common_argume
query_parameters.emplace(String(param_continuation), String(arg));
}
}
else if (startsWith(arg, "--host") || startsWith(arg, "-h"))
{
std::string host_arg;
/// --host host
if (arg == "--host"sv || arg == "-h"sv)
{
++arg_num;
if (arg_num >= argc)
throw Exception("Host argument requires value", ErrorCodes::BAD_ARGUMENTS);
arg = argv[arg_num];
host_arg = "--host=";
host_arg.append(arg);
}
else
host_arg = arg;
/// --port port1 --host host1
if (!prev_port_arg.empty())
{
hosts_and_ports_arguments.push_back({host_arg, prev_port_arg});
prev_port_arg.clear();
}
else
{
/// --host host1 --host host2
if (!prev_host_arg.empty())
hosts_and_ports_arguments.push_back({prev_host_arg});
prev_host_arg = host_arg;
}
}
else if (startsWith(arg, "--port"))
{
std::string port_arg = arg;
/// --port port
if (arg == "--port"sv)
{
port_arg.push_back('=');
++arg_num;
if (arg_num >= argc)
throw Exception("Port argument requires value", ErrorCodes::BAD_ARGUMENTS);
arg = argv[arg_num];
port_arg.append(arg);
}
/// --host host1 --port port1
if (!prev_host_arg.empty())
{
hosts_and_ports_arguments.push_back({port_arg, prev_host_arg});
prev_host_arg.clear();
}
else
{
/// --port port1 --port port2
if (!prev_port_arg.empty())
hosts_and_ports_arguments.push_back({prev_port_arg});
prev_port_arg = port_arg;
}
}
else
common_arguments.emplace_back(arg);
}
}
if (!prev_host_arg.empty())
hosts_and_ports_arguments.push_back({prev_host_arg});
if (!prev_port_arg.empty())
hosts_and_ports_arguments.push_back({prev_port_arg});
}
void ClientBase::parseAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments)
@ -1838,8 +1911,9 @@ void ClientBase::init(int argc, char ** argv)
Arguments common_arguments{""}; /// 0th argument is ignored.
std::vector<Arguments> external_tables_arguments;
std::vector<Arguments> hosts_and_ports_arguments;
readArguments(argc, argv, common_arguments, external_tables_arguments);
readArguments(argc, argv, common_arguments, external_tables_arguments, hosts_and_ports_arguments);
po::variables_map options;
OptionsDescription options_description;
@ -1929,7 +2003,7 @@ void ClientBase::init(int argc, char ** argv)
/// Output of help message.
if (options.count("help")
|| (options.count("host") && options["host"].as<std::vector<HostPort>>()[0].host == "elp")) /// If user writes -help instead of --help.
|| (options.count("host") && options["host"].as<std::string>() == "elp")) /// If user writes -help instead of --help.
{
printHelpMessage(options_description);
exit(0);
@ -1992,7 +2066,7 @@ void ClientBase::init(int argc, char ** argv)
profile_events.print = options.count("print-profile-events");
profile_events.delay_ms = options["profile-events-delay-ms"].as<UInt64>();
processOptions(options_description, options, external_tables_arguments);
processOptions(options_description, options, external_tables_arguments, hosts_and_ports_arguments);
argsToConfig(common_arguments, config(), 100);
clearPasswordFromCommandLine(argc, argv);

View File

@ -92,13 +92,15 @@ protected:
{
std::optional<ProgramOptionsDescription> main_description;
std::optional<ProgramOptionsDescription> external_description;
std::optional<ProgramOptionsDescription> hosts_and_ports_description;
};
virtual void printHelpMessage(const OptionsDescription & options_description) = 0;
virtual void addOptions(OptionsDescription & options_description) = 0;
virtual void processOptions(const OptionsDescription & options_description,
const CommandLineOptions & options,
const std::vector<Arguments> & external_tables_arguments) = 0;
const std::vector<Arguments> & external_tables_arguments,
const std::vector<Arguments> & hosts_and_ports_arguments) = 0;
virtual void processConfig() = 0;
protected:
@ -134,7 +136,12 @@ private:
void resetOutput();
void outputQueryInfo(bool echo_query_);
void readArguments(int argc, char ** argv, Arguments & common_arguments, std::vector<Arguments> & external_tables_arguments);
void readArguments(
int argc,
char ** argv,
Arguments & common_arguments,
std::vector<Arguments> & external_tables_arguments,
std::vector<Arguments> & hosts_and_ports_arguments);
void parseAndCheckOptions(OptionsDescription & options_description, po::variables_map & options, Arguments & arguments);
void updateSuggest(const ASTCreateQuery & ast_create);
@ -245,24 +252,13 @@ protected:
QueryProcessingStage::Enum query_processing_stage;
struct HostPort
struct HostAndPort
{
String host;
std::optional<UInt16> port{};
friend std::istream & operator>>(std::istream & in, HostPort & hostPort)
{
String host_with_port;
in >> host_with_port;
DB::DNSResolver & resolver = DB::DNSResolver::instance();
std::pair<Poco::Net::IPAddress, std::optional<UInt16>>
host_and_port = resolver.resolveHostOrAddress(host_with_port);
hostPort.host = host_and_port.first.toString();
hostPort.port = host_and_port.second;
return in;
}
UInt16 port;
};
std::vector<HostPort> hosts_ports{};
std::vector<HostAndPort> hosts_and_ports{};
};
}

View File

@ -202,45 +202,6 @@ Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host, U
return Poco::Net::SocketAddress(impl->cache_host(host).front(), port);
}
std::pair<Poco::Net::IPAddress, std::optional<UInt16>> DNSResolver::resolveHostOrAddress(const std::string & host_and_port)
{
Poco::Net::IPAddress ip;
size_t number_of_colons = std::count(host_and_port.begin(), host_and_port.end(), ':');
if (number_of_colons > 1)
{
/// IPv6 host
if (host_and_port.starts_with('['))
{
size_t close_bracket_pos = host_and_port.find(']');
assert(close_bracket_pos != std::string::npos);
ip = resolveHost(host_and_port.substr(0, close_bracket_pos));
if (close_bracket_pos == host_and_port.size() - 1)
return {ip, std::nullopt};
if (host_and_port[close_bracket_pos + 1] != ':')
throw Exception("Missing delimiter between host and port", ErrorCodes::BAD_ARGUMENTS);
unsigned int port;
if (!Poco::NumberParser::tryParseUnsigned(host_and_port.substr(close_bracket_pos + 2), port))
throw Exception("Port must be numeric", ErrorCodes::BAD_ARGUMENTS);
if (port > 0xFFFF)
throw Exception("Port must be less 0xFFFF", ErrorCodes::BAD_ARGUMENTS);
return {ip, port};
}
return {resolveHost(host_and_port), std::nullopt};
}
else if (number_of_colons == 1)
{
/// IPv4 host with port
Poco::Net::SocketAddress socket = resolveAddress(host_and_port);
return {socket.host(), socket.port()};
}
/// IPv4 host
return {resolveHost(host_and_port), std::nullopt};
}
String DNSResolver::reverseResolve(const Poco::Net::IPAddress & address)
{
if (impl->disable_cache)

View File

@ -34,10 +34,6 @@ public:
Poco::Net::SocketAddress resolveAddress(const std::string & host, UInt16 port);
/// Accepts host names like 'example.com'/'example.com:port' or '127.0.0.1'/'127.0.0.1:port' or '::1'/'[::1]:port'
/// and resolves its IP and port, if port is set
std::pair<Poco::Net::IPAddress, std::optional<UInt16>> resolveHostOrAddress(const std::string & host_and_port);
/// Accepts host IP and resolves its host name
String reverseResolve(const Poco::Net::IPAddress & address);

View File

@ -9,3 +9,10 @@
1
1
1
1
1
1
1
1
1
1

View File

@ -14,24 +14,24 @@ not_resolvable_host="notlocalhost"
exception_msg="Cannot resolve host (${not_resolvable_host}), error 0: ${not_resolvable_host}.
Code: 198. DB::Exception: Not found address of host: ${not_resolvable_host}. (DNS_ERROR)
"
error="$(${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}" "${not_resolvable_host}" --query "SELECT 1" 2>&1 > /dev/null)";
error="$(${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}" --host "${not_resolvable_host}" --query "SELECT 1" 2>&1 > /dev/null)";
[ "${error}" == "${exception_msg}" ]; echo "$?"
not_number_port="abc"
exception_msg="Bad arguments: the argument ('${CLICKHOUSE_HOST}:${not_number_port}') for option '--host' is invalid."
error="$(${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}:${not_number_port}" --query "SELECT 1" 2>&1 > /dev/null)";
error="$(${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}" --port "${not_number_port}" --query "SELECT 1" 2>&1 > /dev/null)";
[ "${error}" == "${exception_msg}" ]; echo "$?"
not_alive_host="10.100.0.0"
${CLICKHOUSE_CLIENT} --host "${not_alive_host}" "${CLICKHOUSE_HOST}" --query "SELECT 1";
${CLICKHOUSE_CLIENT} --host "${not_alive_host}" --host "${CLICKHOUSE_HOST}" --query "SELECT 1";
not_alive_port="1"
exception_msg="Code: 210. DB::NetException: Connection refused (${CLICKHOUSE_HOST}:${not_alive_port}). (NETWORK_ERROR)
"
error="$(${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}" --port "${not_alive_port}" --query "SELECT 1" 2>&1 > /dev/null)"
[ "${error}" == "${exception_msg}" ]; echo "$?"
${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}:${not_alive_port}" "${CLICKHOUSE_HOST}" --query "SELECT 1";
${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_TCP}" --port "${not_alive_port}" --query "SELECT 1";
${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}" --port "${not_alive_port}" --host "${CLICKHOUSE_HOST}" --query "SELECT 1";
${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}" --port "${CLICKHOUSE_PORT_TCP}" --port "${not_alive_port}" --query "SELECT 1";
ipv6_host_without_brackets="2001:3984:3989::1:1000"
exception_msg="Code: 210. DB::NetException: Connection refused (${ipv6_host_without_brackets}). (NETWORK_ERROR)
@ -47,5 +47,15 @@ error="$(${CLICKHOUSE_CLIENT} --host "${ipv6_host_with_brackets}" --query "SELEC
exception_msg="Code: 210. DB::NetException: Connection refused (${ipv6_host_with_brackets}:${not_alive_port}). (NETWORK_ERROR)
"
error="$(${CLICKHOUSE_CLIENT} --host "${ipv6_host_with_brackets}:${not_alive_port}" --query "SELECT 1" 2>&1 > /dev/null)"
error="$(${CLICKHOUSE_CLIENT} --host "${ipv6_host_with_brackets}" --port "${not_alive_port}" --query "SELECT 1" 2>&1 > /dev/null)"
[ "${error}" == "${exception_msg}" ]; echo "$?"
${CLICKHOUSE_CLIENT} --query "SELECT 1";
${CLICKHOUSE_CLIENT} --port "${CLICKHOUSE_PORT_TCP}" --query "SELECT 1";
${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}" --query "SELECT 1";
${CLICKHOUSE_CLIENT} --port "${CLICKHOUSE_PORT_TCP}" --host "${CLICKHOUSE_HOST}" --query "SELECT 1";
${CLICKHOUSE_CLIENT} --port "${CLICKHOUSE_PORT_TCP}" --host "${CLICKHOUSE_HOST}" --host "{$not_alive_host}" --port "${CLICKHOUSE_PORT_TCP}" --query "SELECT 1";
${CLICKHOUSE_CLIENT} --port "${CLICKHOUSE_PORT_TCP}" --host "{$not_alive_host}" --host "${CLICKHOUSE_HOST}" --query "SELECT 1" 2> /dev/null;
${CLICKHOUSE_CLIENT} --port "${CLICKHOUSE_PORT_TCP}" --port "${CLICKHOUSE_PORT_TCP}" --port "${CLICKHOUSE_PORT_TCP}" --host "{$not_alive_host}" --host "${CLICKHOUSE_HOST}" --query "SELECT 1";