mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #33824 from ManagedDatabases/client-fault-tolerant-connection
Client fault tolerant connection
This commit is contained in:
commit
4ec0f6f091
@ -481,48 +481,76 @@ catch (...)
|
||||
|
||||
void Client::connect()
|
||||
{
|
||||
connection_parameters = ConnectionParameters(config());
|
||||
|
||||
if (is_interactive)
|
||||
std::cout << "Connecting to "
|
||||
<< (!connection_parameters.default_database.empty() ? "database " + connection_parameters.default_database + " at "
|
||||
: "")
|
||||
<< connection_parameters.host << ":" << connection_parameters.port
|
||||
<< (!connection_parameters.user.empty() ? " as user " + connection_parameters.user : "") << "." << std::endl;
|
||||
UInt16 default_port = ConnectionParameters::getPortFromConfig(config());
|
||||
connection_parameters = ConnectionParameters(config(), hosts_ports[0].host,
|
||||
hosts_ports[0].port.value_or(default_port));
|
||||
|
||||
String server_name;
|
||||
UInt64 server_version_major = 0;
|
||||
UInt64 server_version_minor = 0;
|
||||
UInt64 server_version_patch = 0;
|
||||
|
||||
try
|
||||
for (size_t attempted_address_index = 0; attempted_address_index < hosts_ports.size(); ++attempted_address_index)
|
||||
{
|
||||
connection = Connection::createConnection(connection_parameters, global_context);
|
||||
connection_parameters.host = hosts_ports[attempted_address_index].host;
|
||||
connection_parameters.port = hosts_ports[attempted_address_index].port.value_or(default_port);
|
||||
|
||||
if (max_client_network_bandwidth)
|
||||
if (is_interactive)
|
||||
std::cout << "Connecting to "
|
||||
<< (!connection_parameters.default_database.empty() ? "database " + connection_parameters.default_database + " at "
|
||||
: "")
|
||||
<< connection_parameters.host << ":" << connection_parameters.port
|
||||
<< (!connection_parameters.user.empty() ? " as user " + connection_parameters.user : "") << "." << std::endl;
|
||||
|
||||
try
|
||||
{
|
||||
ThrottlerPtr throttler = std::make_shared<Throttler>(max_client_network_bandwidth, 0, "");
|
||||
connection->setThrottler(throttler);
|
||||
}
|
||||
connection = Connection::createConnection(connection_parameters, global_context);
|
||||
|
||||
connection->getServerVersion(
|
||||
connection_parameters.timeouts, server_name, server_version_major, server_version_minor, server_version_patch, server_revision);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
/// It is typical when users install ClickHouse, type some password and instantly forget it.
|
||||
if ((connection_parameters.user.empty() || connection_parameters.user == "default")
|
||||
&& e.code() == DB::ErrorCodes::AUTHENTICATION_FAILED)
|
||||
if (max_client_network_bandwidth)
|
||||
{
|
||||
ThrottlerPtr throttler = std::make_shared<Throttler>(max_client_network_bandwidth, 0, "");
|
||||
connection->setThrottler(throttler);
|
||||
}
|
||||
|
||||
connection->getServerVersion(
|
||||
connection_parameters.timeouts, server_name, server_version_major, server_version_minor, server_version_patch, server_revision);
|
||||
config().setString("host", connection_parameters.host);
|
||||
config().setInt("port", connection_parameters.port);
|
||||
break;
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
std::cerr << std::endl
|
||||
<< "If you have installed ClickHouse and forgot password you can reset it in the configuration file." << std::endl
|
||||
<< "The password for default user is typically located at /etc/clickhouse-server/users.d/default-password.xml" << std::endl
|
||||
<< "and deleting this file will reset the password." << std::endl
|
||||
<< "See also /etc/clickhouse-server/users.xml on the server where ClickHouse is installed." << std::endl
|
||||
<< std::endl;
|
||||
}
|
||||
/// It is typical when users install ClickHouse, type some password and instantly forget it.
|
||||
/// This problem can't be fixed with reconnection so it is not attempted
|
||||
if ((connection_parameters.user.empty() || connection_parameters.user == "default")
|
||||
&& e.code() == DB::ErrorCodes::AUTHENTICATION_FAILED)
|
||||
{
|
||||
std::cerr << std::endl
|
||||
<< "If you have installed ClickHouse and forgot password you can reset it in the configuration file." << std::endl
|
||||
<< "The password for default user is typically located at /etc/clickhouse-server/users.d/default-password.xml" << std::endl
|
||||
<< "and deleting this file will reset the password." << std::endl
|
||||
<< "See also /etc/clickhouse-server/users.xml on the server where ClickHouse is installed." << std::endl
|
||||
<< std::endl;
|
||||
throw;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (attempted_address_index == hosts_ports.size() - 1)
|
||||
throw;
|
||||
|
||||
throw;
|
||||
if (is_interactive)
|
||||
{
|
||||
std::cerr << "Connection attempt to database at "
|
||||
<< connection_parameters.host << ":" << connection_parameters.port
|
||||
<< " resulted in failure"
|
||||
<< std::endl
|
||||
<< getExceptionMessage(e, false)
|
||||
<< std::endl
|
||||
<< "Attempting connection to the next provided address"
|
||||
<< std::endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
server_version = toString(server_version_major) + "." + toString(server_version_minor) + "." + toString(server_version_patch);
|
||||
@ -966,8 +994,11 @@ void Client::addOptions(OptionsDescription & options_description)
|
||||
/// Main commandline options related to client functionality and all parameters from Settings.
|
||||
options_description.main_description->add_options()
|
||||
("config,c", po::value<std::string>(), "config-file path (another shorthand)")
|
||||
("host,h", po::value<std::string>()->default_value("localhost"), "server host")
|
||||
("port", po::value<int>()->default_value(9000), "server port")
|
||||
("host,h", po::value<std::vector<HostPort>>()->multitoken()->default_value({{"localhost"}}, "localhost"),
|
||||
"list of server hosts with optionally assigned port to connect. List elements are separated by a space."
|
||||
"Every list element looks like '<host>[:<port>]'. If port isn't assigned, connection is made by port from '--port' param"
|
||||
"Example of usage: '-h host1:1 host2 host3:3'")
|
||||
("port", po::value<int>()->default_value(9000), "server port, which is default port for every host from '--host' param")
|
||||
("secure,s", "Use TLS connection")
|
||||
("user,u", po::value<std::string>()->default_value("default"), "user")
|
||||
/** If "--password [value]" is used but the value is omitted, the bad argument exception will be thrown.
|
||||
@ -1074,8 +1105,8 @@ void Client::processOptions(const OptionsDescription & options_description,
|
||||
|
||||
if (options.count("config"))
|
||||
config().setString("config-file", options["config"].as<std::string>());
|
||||
if (options.count("host") && !options["host"].defaulted())
|
||||
config().setString("host", options["host"].as<std::string>());
|
||||
if (options.count("host"))
|
||||
hosts_ports = options["host"].as<std::vector<HostPort>>();
|
||||
if (options.count("interleave-queries-file"))
|
||||
interleave_queries_files = options["interleave-queries-file"].as<std::vector<std::string>>();
|
||||
if (options.count("port") && !options["port"].defaulted())
|
||||
|
@ -1929,7 +1929,7 @@ void ClientBase::init(int argc, char ** argv)
|
||||
|
||||
/// Output of help message.
|
||||
if (options.count("help")
|
||||
|| (options.count("host") && options["host"].as<std::string>() == "elp")) /// If user writes -help instead of --help.
|
||||
|| (options.count("host") && options["host"].as<std::vector<HostPort>>()[0].host == "elp")) /// If user writes -help instead of --help.
|
||||
{
|
||||
printHelpMessage(options_description);
|
||||
exit(0);
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Common/InterruptListener.h>
|
||||
#include <Common/ShellCommand.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/DNSResolver.h>
|
||||
#include <Core/ExternalTable.h>
|
||||
#include <Poco/Util/Application.h>
|
||||
#include <Interpreters/Context.h>
|
||||
@ -243,6 +244,25 @@ protected:
|
||||
} profile_events;
|
||||
|
||||
QueryProcessingStage::Enum query_processing_stage;
|
||||
|
||||
struct HostPort
|
||||
{
|
||||
String host;
|
||||
std::optional<UInt16> port{};
|
||||
friend std::istream & operator>>(std::istream & in, HostPort & hostPort)
|
||||
{
|
||||
String host_with_port;
|
||||
in >> host_with_port;
|
||||
DB::DNSResolver & resolver = DB::DNSResolver::instance();
|
||||
std::pair<Poco::Net::IPAddress, std::optional<UInt16>>
|
||||
host_and_port = resolver.resolveHostOrAddress(host_with_port);
|
||||
hostPort.host = host_and_port.first.toString();
|
||||
hostPort.port = host_and_port.second;
|
||||
|
||||
return in;
|
||||
}
|
||||
};
|
||||
std::vector<HostPort> hosts_ports{};
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -23,15 +23,13 @@ namespace ErrorCodes
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfiguration & config)
|
||||
ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfiguration & config,
|
||||
std::string connection_host,
|
||||
int connection_port) : host(connection_host), port(connection_port)
|
||||
{
|
||||
bool is_secure = config.getBool("secure", false);
|
||||
security = is_secure ? Protocol::Secure::Enable : Protocol::Secure::Disable;
|
||||
|
||||
host = config.getString("host", "localhost");
|
||||
port = config.getInt(
|
||||
"port", config.getInt(is_secure ? "tcp_port_secure" : "tcp_port", is_secure ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT));
|
||||
|
||||
default_database = config.getString("database", "");
|
||||
|
||||
/// changed the default value to "default" to fix the issue when the user in the prompt is blank
|
||||
@ -61,12 +59,25 @@ ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfigurati
|
||||
|
||||
/// By default compression is disabled if address looks like localhost.
|
||||
compression = config.getBool("compression", !isLocalAddress(DNSResolver::instance().resolveHost(host)))
|
||||
? Protocol::Compression::Enable : Protocol::Compression::Disable;
|
||||
? Protocol::Compression::Enable : Protocol::Compression::Disable;
|
||||
|
||||
timeouts = ConnectionTimeouts(
|
||||
Poco::Timespan(config.getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0),
|
||||
Poco::Timespan(config.getInt("send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0),
|
||||
Poco::Timespan(config.getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0),
|
||||
Poco::Timespan(config.getInt("tcp_keep_alive_timeout", 0), 0));
|
||||
Poco::Timespan(config.getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0),
|
||||
Poco::Timespan(config.getInt("send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0),
|
||||
Poco::Timespan(config.getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0),
|
||||
Poco::Timespan(config.getInt("tcp_keep_alive_timeout", 0), 0));
|
||||
}
|
||||
|
||||
ConnectionParameters::ConnectionParameters(const Poco::Util::AbstractConfiguration & config)
|
||||
: ConnectionParameters(config, config.getString("host", "localhost"), getPortFromConfig(config))
|
||||
{
|
||||
}
|
||||
|
||||
int ConnectionParameters::getPortFromConfig(const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
bool is_secure = config.getBool("secure", false);
|
||||
return config.getInt("port",
|
||||
config.getInt(is_secure ? "tcp_port_secure" : "tcp_port",
|
||||
is_secure ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT));
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,9 @@ struct ConnectionParameters
|
||||
|
||||
ConnectionParameters() {}
|
||||
ConnectionParameters(const Poco::Util::AbstractConfiguration & config);
|
||||
ConnectionParameters(const Poco::Util::AbstractConfiguration & config, std::string host, int port);
|
||||
|
||||
static int getPortFromConfig(const Poco::Util::AbstractConfiguration & config);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -202,6 +202,45 @@ Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host, U
|
||||
return Poco::Net::SocketAddress(impl->cache_host(host).front(), port);
|
||||
}
|
||||
|
||||
std::pair<Poco::Net::IPAddress, std::optional<UInt16>> DNSResolver::resolveHostOrAddress(const std::string & host_and_port)
|
||||
{
|
||||
Poco::Net::IPAddress ip;
|
||||
|
||||
size_t number_of_colons = std::count(host_and_port.begin(), host_and_port.end(), ':');
|
||||
if (number_of_colons > 1)
|
||||
{
|
||||
/// IPv6 host
|
||||
if (host_and_port.starts_with('['))
|
||||
{
|
||||
size_t close_bracket_pos = host_and_port.find(']');
|
||||
assert(close_bracket_pos != std::string::npos);
|
||||
ip = resolveHost(host_and_port.substr(0, close_bracket_pos));
|
||||
|
||||
if (close_bracket_pos == host_and_port.size() - 1)
|
||||
return {ip, std::nullopt};
|
||||
if (host_and_port[close_bracket_pos + 1] != ':')
|
||||
throw Exception("Missing delimiter between host and port", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
unsigned int port;
|
||||
if (!Poco::NumberParser::tryParseUnsigned(host_and_port.substr(close_bracket_pos + 2), port))
|
||||
throw Exception("Port must be numeric", ErrorCodes::BAD_ARGUMENTS);
|
||||
if (port > 0xFFFF)
|
||||
throw Exception("Port must be less 0xFFFF", ErrorCodes::BAD_ARGUMENTS);
|
||||
return {ip, port};
|
||||
}
|
||||
return {resolveHost(host_and_port), std::nullopt};
|
||||
}
|
||||
else if (number_of_colons == 1)
|
||||
{
|
||||
/// IPv4 host with port
|
||||
Poco::Net::SocketAddress socket = resolveAddress(host_and_port);
|
||||
return {socket.host(), socket.port()};
|
||||
}
|
||||
|
||||
/// IPv4 host
|
||||
return {resolveHost(host_and_port), std::nullopt};
|
||||
}
|
||||
|
||||
String DNSResolver::reverseResolve(const Poco::Net::IPAddress & address)
|
||||
{
|
||||
if (impl->disable_cache)
|
||||
|
@ -34,6 +34,10 @@ public:
|
||||
|
||||
Poco::Net::SocketAddress resolveAddress(const std::string & host, UInt16 port);
|
||||
|
||||
/// Accepts host names like 'example.com'/'example.com:port' or '127.0.0.1'/'127.0.0.1:port' or '::1'/'[::1]:port'
|
||||
/// and resolves its IP and port, if port is set
|
||||
std::pair<Poco::Net::IPAddress, std::optional<UInt16>> resolveHostOrAddress(const std::string & host_and_port);
|
||||
|
||||
/// Accepts host IP and resolves its host name
|
||||
String reverseResolve(const Poco::Net::IPAddress & address);
|
||||
|
||||
|
@ -0,0 +1,11 @@
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
51
tests/queries/0_stateless/02100_multiple_hosts_command_line_set.sh
Executable file
51
tests/queries/0_stateless/02100_multiple_hosts_command_line_set.sh
Executable file
@ -0,0 +1,51 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
# default values test
|
||||
${CLICKHOUSE_CLIENT} --query "SELECT 1"
|
||||
|
||||
# backward compatibility test
|
||||
${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}" --port "${CLICKHOUSE_PORT_TCP}" --query "SELECT 1";
|
||||
|
||||
not_resolvable_host="notlocalhost"
|
||||
exception_msg="Cannot resolve host (${not_resolvable_host}), error 0: ${not_resolvable_host}.
|
||||
Code: 198. DB::Exception: Not found address of host: ${not_resolvable_host}. (DNS_ERROR)
|
||||
"
|
||||
error="$(${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}" "${not_resolvable_host}" --query "SELECT 1" 2>&1 > /dev/null)";
|
||||
[ "${error}" == "${exception_msg}" ]; echo "$?"
|
||||
|
||||
not_number_port="abc"
|
||||
exception_msg="Bad arguments: the argument ('${CLICKHOUSE_HOST}:${not_number_port}') for option '--host' is invalid."
|
||||
error="$(${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}:${not_number_port}" --query "SELECT 1" 2>&1 > /dev/null)";
|
||||
[ "${error}" == "${exception_msg}" ]; echo "$?"
|
||||
|
||||
not_alive_host="10.100.0.0"
|
||||
${CLICKHOUSE_CLIENT} --host "${not_alive_host}" "${CLICKHOUSE_HOST}" --query "SELECT 1";
|
||||
|
||||
not_alive_port="1"
|
||||
exception_msg="Code: 210. DB::NetException: Connection refused (${CLICKHOUSE_HOST}:${not_alive_port}). (NETWORK_ERROR)
|
||||
"
|
||||
error="$(${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}" --port "${not_alive_port}" --query "SELECT 1" 2>&1 > /dev/null)"
|
||||
[ "${error}" == "${exception_msg}" ]; echo "$?"
|
||||
${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}:${not_alive_port}" "${CLICKHOUSE_HOST}" --query "SELECT 1";
|
||||
${CLICKHOUSE_CLIENT} --host "${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_TCP}" --port "${not_alive_port}" --query "SELECT 1";
|
||||
|
||||
ipv6_host_without_brackets="2001:3984:3989::1:1000"
|
||||
exception_msg="Code: 210. DB::NetException: Connection refused (${ipv6_host_without_brackets}). (NETWORK_ERROR)
|
||||
"
|
||||
error="$(${CLICKHOUSE_CLIENT} --host "${ipv6_host_without_brackets}" --query "SELECT 1" 2>&1 > /dev/null)"
|
||||
[ "${error}" == "${exception_msg}" ]; echo "$?"
|
||||
|
||||
ipv6_host_with_brackets="[2001:3984:3989::1:1000]"
|
||||
exception_msg="Code: 210. DB::NetException: Connection refused (${ipv6_host_with_brackets}). (NETWORK_ERROR)
|
||||
"
|
||||
error="$(${CLICKHOUSE_CLIENT} --host "${ipv6_host_with_brackets}" --query "SELECT 1" 2>&1 > /dev/null)"
|
||||
[ "${error}" == "${exception_msg}" ]; echo "$?"
|
||||
|
||||
exception_msg="Code: 210. DB::NetException: Connection refused (${ipv6_host_with_brackets}:${not_alive_port}). (NETWORK_ERROR)
|
||||
"
|
||||
error="$(${CLICKHOUSE_CLIENT} --host "${ipv6_host_with_brackets}:${not_alive_port}" --query "SELECT 1" 2>&1 > /dev/null)"
|
||||
[ "${error}" == "${exception_msg}" ]; echo "$?"
|
Loading…
Reference in New Issue
Block a user