Merge pull request #65570 from ClickHouse/keeper_az

Improve ZooKeeper load balancing
This commit is contained in:
Alexander Tokmakov 2024-06-28 22:57:20 +00:00 committed by GitHub
commit 4748e2929f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 501 additions and 280 deletions

View File

@ -383,6 +383,9 @@ int KeeperClient::main(const std::vector<String> & /* args */)
for (const auto & key : keys)
{
if (key != "node")
continue;
String prefix = "zookeeper." + key;
String host = clickhouse_config.configuration->getString(prefix + ".host");
String port = clickhouse_config.configuration->getString(prefix + ".port");
@ -401,6 +404,7 @@ int KeeperClient::main(const std::vector<String> & /* args */)
zk_args.hosts.push_back(host + ":" + port);
}
zk_args.availability_zones.resize(zk_args.hosts.size());
zk_args.connection_timeout_ms = config().getInt("connection-timeout", 10) * 1000;
zk_args.session_timeout_ms = config().getInt("session-timeout", 10) * 1000;
zk_args.operation_timeout_ms = config().getInt("operation-timeout", 10) * 1000;

View File

@ -355,10 +355,7 @@ try
std::string include_from_path = config().getString("include_from", "/etc/metrika.xml");
if (config().has(DB::PlacementInfo::PLACEMENT_CONFIG_PREFIX))
{
PlacementInfo::PlacementInfo::instance().initialize(config());
}
PlacementInfo::PlacementInfo::instance().initialize(config());
GlobalThreadPool::initialize(
/// We need to have sufficient amount of threads for connections + nuraft workers + keeper workers, 1000 is an estimation

View File

@ -1003,6 +1003,8 @@ try
ServerUUID::load(path / "uuid", log);
PlacementInfo::PlacementInfo::instance().initialize(config());
zkutil::validateZooKeeperConfig(config());
bool has_zookeeper = zkutil::hasZooKeeperConfig(config());
@ -1817,11 +1819,6 @@ try
}
if (config().has(DB::PlacementInfo::PLACEMENT_CONFIG_PREFIX))
{
PlacementInfo::PlacementInfo::instance().initialize(config());
}
{
std::lock_guard lock(servers_lock);
/// We should start interserver communications before (and more important shutdown after) tables.

View File

@ -60,4 +60,26 @@ GetPriorityForLoadBalancing::getPriorityFunc(LoadBalancing load_balance, size_t
return get_priority;
}
/// Some load balancing strategies (such as "nearest hostname") have preferred nodes to connect to.
/// Usually it's a node in the same data center/availability zone.
/// For other strategies there's no difference between nodes.
bool GetPriorityForLoadBalancing::hasOptimalNode() const
{
switch (load_balancing)
{
case LoadBalancing::NEAREST_HOSTNAME:
return true;
case LoadBalancing::HOSTNAME_LEVENSHTEIN_DISTANCE:
return true;
case LoadBalancing::IN_ORDER:
return false;
case LoadBalancing::RANDOM:
return false;
case LoadBalancing::FIRST_OR_RANDOM:
return true;
case LoadBalancing::ROUND_ROBIN:
return false;
}
}
}

View File

@ -30,6 +30,8 @@ public:
Func getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const;
bool hasOptimalNode() const;
std::vector<size_t> hostname_prefix_distance; /// Prefix distances from name of this host to the names of hosts of pools.
std::vector<size_t> hostname_levenshtein_distance; /// Levenshtein Distances from name of this host to the names of hosts of pools.

View File

@ -559,6 +559,8 @@ public:
/// Useful to check owner of ephemeral node.
virtual int64_t getSessionID() const = 0;
virtual String tryGetAvailabilityZone() { return ""; }
/// If the method will throw an exception, callbacks won't be called.
///
/// After the method is executed successfully, you must wait for callbacks
@ -635,10 +637,6 @@ public:
virtual const DB::KeeperFeatureFlags * getKeeperFeatureFlags() const { return nullptr; }
/// A ZooKeeper session can have an optional deadline set on it.
/// After it has been reached, the session needs to be finalized.
virtual bool hasReachedDeadline() const = 0;
/// Expire session and finish all pending requests
virtual void finalize(const String & reason) = 0;
};

View File

@ -39,7 +39,6 @@ public:
~TestKeeper() override;
bool isExpired() const override { return expired; }
bool hasReachedDeadline() const override { return false; }
Int8 getConnectedNodeIdx() const override { return 0; }
String getConnectedHostPort() const override { return "TestKeeper:0000"; }
int32_t getConnectionXid() const override { return 0; }

View File

@ -8,6 +8,7 @@
#include <functional>
#include <ranges>
#include <vector>
#include <chrono>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
@ -16,10 +17,12 @@
#include <base/sort.h>
#include <base/getFQDNOrHostName.h>
#include <Core/ServerUUID.h>
#include <Core/BackgroundSchedulePool.h>
#include "Common/ZooKeeper/IKeeper.h"
#include <Common/DNSResolver.h>
#include <Common/StringUtils.h>
#include <Common/Exception.h>
#include <Interpreters/Context.h>
#include <Poco/Net/NetException.h>
#include <Poco/Net/DNS.h>
@ -55,70 +58,120 @@ static void check(Coordination::Error code, const std::string & path)
throw KeeperException::fromPath(code, path);
}
UInt64 getSecondsUntilReconnect(const ZooKeeperArgs & args)
{
std::uniform_int_distribution<UInt32> fallback_session_lifetime_distribution
{
args.fallback_session_lifetime.min_sec,
args.fallback_session_lifetime.max_sec,
};
UInt32 session_lifetime_seconds = fallback_session_lifetime_distribution(thread_local_rng);
return session_lifetime_seconds;
}
void ZooKeeper::init(ZooKeeperArgs args_)
void ZooKeeper::updateAvailabilityZones()
{
ShuffleHosts shuffled_hosts = shuffleHosts();
for (const auto & node : shuffled_hosts)
{
try
{
ShuffleHosts single_node{node};
auto tmp_impl = std::make_unique<Coordination::ZooKeeper>(single_node, args, zk_log);
auto idx = node.original_index;
availability_zones[idx] = tmp_impl->tryGetAvailabilityZone();
LOG_TEST(log, "Got availability zone for {}: {}", args.hosts[idx], availability_zones[idx]);
}
catch (...)
{
DB::tryLogCurrentException(log, "Failed to get availability zone for " + node.host);
}
}
LOG_DEBUG(log, "Updated availability zones: [{}]", fmt::join(availability_zones, ", "));
}
void ZooKeeper::init(ZooKeeperArgs args_, std::unique_ptr<Coordination::IKeeper> existing_impl)
{
args = std::move(args_);
log = getLogger("ZooKeeper");
if (args.implementation == "zookeeper")
if (existing_impl)
{
chassert(args.implementation == "zookeeper");
impl = std::move(existing_impl);
LOG_INFO(log, "Switching to connection to a more optimal node {}", impl->getConnectedHostPort());
}
else if (args.implementation == "zookeeper")
{
if (args.hosts.empty())
throw KeeperException::fromMessage(Coordination::Error::ZBADARGUMENTS, "No hosts passed to ZooKeeper constructor.");
Coordination::ZooKeeper::Nodes nodes;
nodes.reserve(args.hosts.size());
chassert(args.availability_zones.size() == args.hosts.size());
if (availability_zones.empty())
{
/// availability_zones is empty on server startup or after config reloading
/// We will keep the az info when starting new sessions
availability_zones = args.availability_zones;
LOG_TEST(log, "Availability zones from config: [{}], client: {}", fmt::join(availability_zones, ", "), args.client_availability_zone);
if (args.availability_zone_autodetect)
updateAvailabilityZones();
}
chassert(availability_zones.size() == args.hosts.size());
/// Shuffle the hosts to distribute the load among ZooKeeper nodes.
std::vector<ShuffleHost> shuffled_hosts = shuffleHosts();
ShuffleHosts shuffled_hosts = shuffleHosts();
bool dns_error = false;
for (auto & host : shuffled_hosts)
{
auto & host_string = host.host;
try
{
const bool secure = startsWith(host_string, "secure://");
if (secure)
host_string.erase(0, strlen("secure://"));
/// We want to resolve all hosts without DNS cache for keeper connection.
Coordination::DNSResolver::instance().removeHostFromCache(host_string);
const Poco::Net::SocketAddress host_socket_addr{host_string};
LOG_TEST(log, "Adding ZooKeeper host {} ({})", host_string, host_socket_addr.toString());
nodes.emplace_back(Coordination::ZooKeeper::Node{host_socket_addr, host.original_index, secure});
}
catch (const Poco::Net::HostNotFoundException & e)
{
/// Most likely it's misconfiguration and wrong hostname was specified
LOG_ERROR(log, "Cannot use ZooKeeper host {}, reason: {}", host_string, e.displayText());
}
catch (const Poco::Net::DNSException & e)
{
/// Most likely DNS is not available now
dns_error = true;
LOG_ERROR(log, "Cannot use ZooKeeper host {} due to DNS error: {}", host_string, e.displayText());
}
}
if (nodes.empty())
{
/// For DNS errors we throw exception with ZCONNECTIONLOSS code, so it will be considered as hardware error, not user error
if (dns_error)
throw KeeperException::fromMessage(Coordination::Error::ZCONNECTIONLOSS, "Cannot resolve any of provided ZooKeeper hosts due to DNS error");
else
throw KeeperException::fromMessage(Coordination::Error::ZCONNECTIONLOSS, "Cannot use any of provided ZooKeeper nodes");
}
impl = std::make_unique<Coordination::ZooKeeper>(nodes, args, zk_log);
impl = std::make_unique<Coordination::ZooKeeper>(shuffled_hosts, args, zk_log);
Int8 node_idx = impl->getConnectedNodeIdx();
if (args.chroot.empty())
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(args.hosts, ","));
else
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), args.chroot);
/// If the balancing strategy has an optimal node then it will be the first in the list
bool connected_to_suboptimal_node = node_idx != shuffled_hosts[0].original_index;
bool respect_az = args.prefer_local_availability_zone && !args.client_availability_zone.empty();
bool may_benefit_from_reconnecting = respect_az || args.get_priority_load_balancing.hasOptimalNode();
if (connected_to_suboptimal_node && may_benefit_from_reconnecting)
{
auto reconnect_timeout_sec = getSecondsUntilReconnect(args);
LOG_DEBUG(log, "Connected to a suboptimal ZooKeeper host ({}, index {})."
" To preserve balance in ZooKeeper usage, this ZooKeeper session will expire in {} seconds",
impl->getConnectedHostPort(), node_idx, reconnect_timeout_sec);
auto reconnect_task_holder = DB::Context::getGlobalContextInstance()->getSchedulePool().createTask("ZKReconnect", [this, optimal_host = shuffled_hosts[0]]()
{
try
{
LOG_DEBUG(log, "Trying to connect to a more optimal node {}", optimal_host.host);
ShuffleHosts node{optimal_host};
std::unique_ptr<Coordination::IKeeper> new_impl = std::make_unique<Coordination::ZooKeeper>(node, args, zk_log);
Int8 new_node_idx = new_impl->getConnectedNodeIdx();
/// Maybe the node was unavailable when getting AZs first time, update just in case
if (args.availability_zone_autodetect && availability_zones[new_node_idx].empty())
{
availability_zones[new_node_idx] = new_impl->tryGetAvailabilityZone();
LOG_DEBUG(log, "Got availability zone for {}: {}", optimal_host.host, availability_zones[new_node_idx]);
}
optimal_impl = std::move(new_impl);
impl->finalize("Connected to a more optimal node");
}
catch (...)
{
LOG_WARNING(log, "Failed to connect to a more optimal ZooKeeper, will try again later: {}", DB::getCurrentExceptionMessage(/*with_stacktrace*/ false));
(*reconnect_task)->scheduleAfter(getSecondsUntilReconnect(args) * 1000);
}
});
reconnect_task = std::make_unique<DB::BackgroundSchedulePoolTaskHolder>(std::move(reconnect_task_holder));
(*reconnect_task)->activate();
(*reconnect_task)->scheduleAfter(reconnect_timeout_sec * 1000);
}
}
else if (args.implementation == "testkeeper")
{
@ -152,29 +205,53 @@ void ZooKeeper::init(ZooKeeperArgs args_)
}
}
ZooKeeper::~ZooKeeper()
{
if (reconnect_task)
(*reconnect_task)->deactivate();
}
ZooKeeper::ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
: zk_log(std::move(zk_log_))
{
init(args_);
init(args_, /*existing_impl*/ {});
}
ZooKeeper::ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_, Strings availability_zones_, std::unique_ptr<Coordination::IKeeper> existing_impl)
: availability_zones(std::move(availability_zones_)), zk_log(std::move(zk_log_))
{
if (availability_zones.size() != args_.hosts.size())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Argument sizes mismatch: availability_zones count {} and hosts count {}",
availability_zones.size(), args_.hosts.size());
init(args_, std::move(existing_impl));
}
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
: zk_log(std::move(zk_log_))
{
init(ZooKeeperArgs(config, config_name));
init(ZooKeeperArgs(config, config_name), /*existing_impl*/ {});
}
std::vector<ShuffleHost> ZooKeeper::shuffleHosts() const
ShuffleHosts ZooKeeper::shuffleHosts() const
{
std::function<Priority(size_t index)> get_priority = args.get_priority_load_balancing.getPriorityFunc(args.get_priority_load_balancing.load_balancing, 0, args.hosts.size());
std::vector<ShuffleHost> shuffle_hosts;
std::function<Priority(size_t index)> get_priority = args.get_priority_load_balancing.getPriorityFunc(
args.get_priority_load_balancing.load_balancing, /* offset for first_or_random */ 0, args.hosts.size());
ShuffleHosts shuffle_hosts;
for (size_t i = 0; i < args.hosts.size(); ++i)
{
ShuffleHost shuffle_host;
shuffle_host.host = args.hosts[i];
shuffle_host.original_index = static_cast<UInt8>(i);
shuffle_host.secure = startsWith(shuffle_host.host, "secure://");
if (shuffle_host.secure)
shuffle_host.host.erase(0, strlen("secure://"));
if (!args.client_availability_zone.empty() && !availability_zones[i].empty())
shuffle_host.az_info = availability_zones[i] == args.client_availability_zone ? ShuffleHost::SAME : ShuffleHost::OTHER;
if (get_priority)
shuffle_host.priority = get_priority(i);
shuffle_host.randomize();
@ -1023,7 +1100,10 @@ ZooKeeperPtr ZooKeeper::create(const Poco::Util::AbstractConfiguration & config,
ZooKeeperPtr ZooKeeper::startNewSession() const
{
auto res = std::shared_ptr<ZooKeeper>(new ZooKeeper(args, zk_log));
if (reconnect_task)
(*reconnect_task)->deactivate();
auto res = std::shared_ptr<ZooKeeper>(new ZooKeeper(args, zk_log, availability_zones, std::move(optimal_impl)));
res->initSession();
return res;
}
@ -1456,6 +1536,16 @@ int32_t ZooKeeper::getConnectionXid() const
return impl->getConnectionXid();
}
String ZooKeeper::getConnectedHostAvailabilityZone() const
{
if (args.implementation != "zookeeper" || !impl)
return "";
Int8 idx = impl->getConnectedNodeIdx();
if (idx < 0)
return ""; /// session expired
return availability_zones.at(idx);
}
size_t getFailedOpIndex(Coordination::Error exception_code, const Coordination::Responses & responses)
{
if (responses.empty())

View File

@ -32,6 +32,7 @@ namespace DB
{
class ZooKeeperLog;
class ZooKeeperWithFaultInjection;
class BackgroundSchedulePoolTaskHolder;
namespace ErrorCodes
{
@ -48,11 +49,23 @@ constexpr size_t MULTI_BATCH_SIZE = 100;
struct ShuffleHost
{
enum AvailabilityZoneInfo
{
SAME = 0,
UNKNOWN = 1,
OTHER = 2,
};
String host;
bool secure = false;
UInt8 original_index = 0;
AvailabilityZoneInfo az_info = UNKNOWN;
Priority priority;
UInt64 random = 0;
/// We should resolve it each time without caching
mutable std::optional<Poco::Net::SocketAddress> address;
void randomize()
{
random = thread_local_rng();
@ -60,11 +73,13 @@ struct ShuffleHost
static bool compare(const ShuffleHost & lhs, const ShuffleHost & rhs)
{
return std::forward_as_tuple(lhs.priority, lhs.random)
< std::forward_as_tuple(rhs.priority, rhs.random);
return std::forward_as_tuple(lhs.az_info, lhs.priority, lhs.random)
< std::forward_as_tuple(rhs.az_info, rhs.priority, rhs.random);
}
};
using ShuffleHosts = std::vector<ShuffleHost>;
struct RemoveException
{
explicit RemoveException(std::string_view path_ = "", bool remove_subtree_ = true)
@ -197,6 +212,9 @@ class ZooKeeper
explicit ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr);
/// Allows to keep info about availability zones when starting a new session
ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_, Strings availability_zones_, std::unique_ptr<Coordination::IKeeper> existing_impl);
/** Config of the form:
<zookeeper>
<node>
@ -228,7 +246,9 @@ public:
using Ptr = std::shared_ptr<ZooKeeper>;
using ErrorsList = std::initializer_list<Coordination::Error>;
std::vector<ShuffleHost> shuffleHosts() const;
~ZooKeeper();
ShuffleHosts shuffleHosts() const;
static Ptr create(const Poco::Util::AbstractConfiguration & config,
const std::string & config_name,
@ -596,8 +616,6 @@ public:
UInt32 getSessionUptime() const { return static_cast<UInt32>(session_uptime.elapsedSeconds()); }
bool hasReachedDeadline() const { return impl->hasReachedDeadline(); }
uint64_t getSessionTimeoutMS() const { return args.session_timeout_ms; }
void setServerCompletelyStarted();
@ -606,6 +624,8 @@ public:
String getConnectedHostPort() const;
int32_t getConnectionXid() const;
String getConnectedHostAvailabilityZone() const;
const DB::KeeperFeatureFlags * getKeeperFeatureFlags() const { return impl->getKeeperFeatureFlags(); }
/// Checks that our session was not killed, and allows to avoid applying a request from an old lost session.
@ -625,7 +645,8 @@ public:
void addCheckSessionOp(Coordination::Requests & requests) const;
private:
void init(ZooKeeperArgs args_);
void init(ZooKeeperArgs args_, std::unique_ptr<Coordination::IKeeper> existing_impl);
void updateAvailabilityZones();
/// The following methods don't any throw exceptions but return error codes.
Coordination::Error createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created);
@ -690,15 +711,20 @@ private:
}
std::unique_ptr<Coordination::IKeeper> impl;
mutable std::unique_ptr<Coordination::IKeeper> optimal_impl;
ZooKeeperArgs args;
Strings availability_zones;
LoggerPtr log = nullptr;
std::shared_ptr<DB::ZooKeeperLog> zk_log;
AtomicStopwatch session_uptime;
int32_t session_node_version;
std::unique_ptr<DB::BackgroundSchedulePoolTaskHolder> reconnect_task;
};

View File

@ -5,6 +5,9 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/isLocalAddress.h>
#include <Common/StringUtils.h>
#include <Common/thread_local_rng.h>
#include <Server/CloudPlacementInfo.h>
#include <IO/S3/Credentials.h>
#include <Poco/String.h>
namespace DB
@ -53,6 +56,7 @@ ZooKeeperArgs::ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, c
ZooKeeperArgs::ZooKeeperArgs(const String & hosts_string)
{
splitInto<','>(hosts, hosts_string);
availability_zones.resize(hosts.size());
}
void ZooKeeperArgs::initFromKeeperServerSection(const Poco::Util::AbstractConfiguration & config)
@ -103,8 +107,11 @@ void ZooKeeperArgs::initFromKeeperServerSection(const Poco::Util::AbstractConfig
for (const auto & key : keys)
{
if (startsWith(key, "server"))
{
hosts.push_back(
(secure ? "secure://" : "") + config.getString(raft_configuration_key + "." + key + ".hostname") + ":" + tcp_port);
availability_zones.push_back(config.getString(raft_configuration_key + "." + key + ".availability_zone", ""));
}
}
static constexpr std::array load_balancing_keys
@ -123,11 +130,15 @@ void ZooKeeperArgs::initFromKeeperServerSection(const Poco::Util::AbstractConfig
auto load_balancing = magic_enum::enum_cast<DB::LoadBalancing>(Poco::toUpper(load_balancing_str));
if (!load_balancing)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknown load balancing: {}", load_balancing_str);
get_priority_load_balancing.load_balancing = *load_balancing;
get_priority_load_balancing = DB::GetPriorityForLoadBalancing(*load_balancing, thread_local_rng() % hosts.size());
break;
}
}
availability_zone_autodetect = config.getBool(std::string{config_name} + ".availability_zone_autodetect", false);
prefer_local_availability_zone = config.getBool(std::string{config_name} + ".prefer_local_availability_zone", false);
if (prefer_local_availability_zone)
client_availability_zone = DB::PlacementInfo::PlacementInfo::instance().getAvailabilityZone();
}
void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
@ -137,6 +148,8 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_name, keys);
std::optional<DB::LoadBalancing> load_balancing;
for (const auto & key : keys)
{
if (key.starts_with("node"))
@ -144,6 +157,7 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio
hosts.push_back(
(config.getBool(config_name + "." + key + ".secure", false) ? "secure://" : "")
+ config.getString(config_name + "." + key + ".host") + ":" + config.getString(config_name + "." + key + ".port", "2181"));
availability_zones.push_back(config.getString(config_name + "." + key + ".availability_zone", ""));
}
else if (key == "session_timeout_ms")
{
@ -199,6 +213,10 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio
{
sessions_path = config.getString(config_name + "." + key);
}
else if (key == "prefer_local_availability_zone")
{
prefer_local_availability_zone = config.getBool(config_name + "." + key);
}
else if (key == "implementation")
{
implementation = config.getString(config_name + "." + key);
@ -207,10 +225,9 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio
{
String load_balancing_str = config.getString(config_name + "." + key);
/// Use magic_enum to avoid dependency from dbms (`SettingFieldLoadBalancingTraits::fromString(...)`)
auto load_balancing = magic_enum::enum_cast<DB::LoadBalancing>(Poco::toUpper(load_balancing_str));
load_balancing = magic_enum::enum_cast<DB::LoadBalancing>(Poco::toUpper(load_balancing_str));
if (!load_balancing)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknown load balancing: {}", load_balancing_str);
get_priority_load_balancing.load_balancing = *load_balancing;
}
else if (key == "fallback_session_lifetime")
{
@ -224,9 +241,19 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio
{
use_compression = config.getBool(config_name + "." + key);
}
else if (key == "availability_zone_autodetect")
{
availability_zone_autodetect = config.getBool(config_name + "." + key);
}
else
throw KeeperException(Coordination::Error::ZBADARGUMENTS, "Unknown key {} in config file", key);
}
if (load_balancing)
get_priority_load_balancing = DB::GetPriorityForLoadBalancing(*load_balancing, thread_local_rng() % hosts.size());
if (prefer_local_availability_zone)
client_availability_zone = DB::PlacementInfo::PlacementInfo::instance().getAvailabilityZone();
}
}

View File

@ -32,10 +32,12 @@ struct ZooKeeperArgs
String zookeeper_name = "zookeeper";
String implementation = "zookeeper";
Strings hosts;
Strings availability_zones;
String auth_scheme;
String identity;
String chroot;
String sessions_path = "/clickhouse/sessions";
String client_availability_zone;
int32_t connection_timeout_ms = Coordination::DEFAULT_CONNECTION_TIMEOUT_MS;
int32_t session_timeout_ms = Coordination::DEFAULT_SESSION_TIMEOUT_MS;
int32_t operation_timeout_ms = Coordination::DEFAULT_OPERATION_TIMEOUT_MS;
@ -47,6 +49,8 @@ struct ZooKeeperArgs
UInt64 send_sleep_ms = 0;
UInt64 recv_sleep_ms = 0;
bool use_compression = false;
bool prefer_local_availability_zone = false;
bool availability_zone_autodetect = false;
SessionLifetimeConfiguration fallback_session_lifetime = {};
DB::GetPriorityForLoadBalancing get_priority_load_balancing;

View File

@ -23,6 +23,9 @@
#include <Common/setThreadName.h>
#include <Common/thread_local_rng.h>
#include <Poco/Net/NetException.h>
#include <Poco/Net/DNS.h>
#include "Coordination/KeeperConstants.h"
#include "config.h"
@ -338,7 +341,7 @@ ZooKeeper::~ZooKeeper()
ZooKeeper::ZooKeeper(
const Nodes & nodes,
const zkutil::ShuffleHosts & nodes,
const zkutil::ZooKeeperArgs & args_,
std::shared_ptr<ZooKeeperLog> zk_log_)
: args(args_)
@ -426,7 +429,7 @@ ZooKeeper::ZooKeeper(
void ZooKeeper::connect(
const Nodes & nodes,
const zkutil::ShuffleHosts & nodes,
Poco::Timespan connection_timeout)
{
if (nodes.empty())
@ -434,15 +437,51 @@ void ZooKeeper::connect(
static constexpr size_t num_tries = 3;
bool connected = false;
bool dns_error = false;
size_t resolved_count = 0;
for (const auto & node : nodes)
{
try
{
const Poco::Net::SocketAddress host_socket_addr{node.host};
LOG_TRACE(log, "Adding ZooKeeper host {} ({}), az: {}, priority: {}", node.host, host_socket_addr.toString(), node.az_info, node.priority);
node.address = host_socket_addr;
++resolved_count;
}
catch (const Poco::Net::HostNotFoundException & e)
{
/// Most likely it's misconfiguration and wrong hostname was specified
LOG_ERROR(log, "Cannot use ZooKeeper host {}, reason: {}", node.host, e.displayText());
}
catch (const Poco::Net::DNSException & e)
{
/// Most likely DNS is not available now
dns_error = true;
LOG_ERROR(log, "Cannot use ZooKeeper host {} due to DNS error: {}", node.host, e.displayText());
}
}
if (resolved_count == 0)
{
/// For DNS errors we throw exception with ZCONNECTIONLOSS code, so it will be considered as hardware error, not user error
if (dns_error)
throw zkutil::KeeperException::fromMessage(
Coordination::Error::ZCONNECTIONLOSS, "Cannot resolve any of provided ZooKeeper hosts due to DNS error");
else
throw zkutil::KeeperException::fromMessage(Coordination::Error::ZCONNECTIONLOSS, "Cannot use any of provided ZooKeeper nodes");
}
WriteBufferFromOwnString fail_reasons;
for (size_t try_no = 0; try_no < num_tries; ++try_no)
{
for (size_t i = 0; i < nodes.size(); ++i)
for (const auto & node : nodes)
{
const auto & node = nodes[i];
try
{
if (!node.address)
continue;
/// Reset the state of previous attempt.
if (node.secure)
{
@ -458,7 +497,7 @@ void ZooKeeper::connect(
socket = Poco::Net::StreamSocket();
}
socket.connect(node.address, connection_timeout);
socket.connect(*node.address, connection_timeout);
socket_address = socket.peerAddress();
socket.setReceiveTimeout(args.operation_timeout_ms * 1000);
@ -498,27 +537,11 @@ void ZooKeeper::connect(
}
original_index = static_cast<Int8>(node.original_index);
if (i != 0)
{
std::uniform_int_distribution<UInt32> fallback_session_lifetime_distribution
{
args.fallback_session_lifetime.min_sec,
args.fallback_session_lifetime.max_sec,
};
UInt32 session_lifetime_seconds = fallback_session_lifetime_distribution(thread_local_rng);
client_session_deadline = clock::now() + std::chrono::seconds(session_lifetime_seconds);
LOG_DEBUG(log, "Connected to a suboptimal ZooKeeper host ({}, index {})."
" To preserve balance in ZooKeeper usage, this ZooKeeper session will expire in {} seconds",
node.address.toString(), i, session_lifetime_seconds);
}
break;
}
catch (...)
{
fail_reasons << "\n" << getCurrentExceptionMessage(false) << ", " << node.address.toString();
fail_reasons << "\n" << getCurrentExceptionMessage(false) << ", " << node.address->toString();
}
}
@ -532,6 +555,9 @@ void ZooKeeper::connect(
bool first = true;
for (const auto & node : nodes)
{
if (!node.address)
continue;
if (first)
first = false;
else
@ -540,7 +566,7 @@ void ZooKeeper::connect(
if (node.secure)
message << "secure://";
message << node.address.toString();
message << node.address->toString();
}
message << fail_reasons.str() << "\n";
@ -1153,7 +1179,6 @@ void ZooKeeper::pushRequest(RequestInfo && info)
{
try
{
checkSessionDeadline();
info.time = clock::now();
auto maybe_zk_log = std::atomic_load(&zk_log);
if (maybe_zk_log)
@ -1201,44 +1226,44 @@ bool ZooKeeper::isFeatureEnabled(KeeperFeatureFlag feature_flag) const
return keeper_feature_flags.isEnabled(feature_flag);
}
void ZooKeeper::initFeatureFlags()
std::optional<String> ZooKeeper::tryGetSystemZnode(const std::string & path, const std::string & description)
{
const auto try_get = [&](const std::string & path, const std::string & description) -> std::optional<std::string>
auto promise = std::make_shared<std::promise<Coordination::GetResponse>>();
auto future = promise->get_future();
auto callback = [promise](const Coordination::GetResponse & response) mutable
{
auto promise = std::make_shared<std::promise<Coordination::GetResponse>>();
auto future = promise->get_future();
auto callback = [promise](const Coordination::GetResponse & response) mutable
{
promise->set_value(response);
};
get(path, std::move(callback), {});
if (future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
throw Exception(Error::ZOPERATIONTIMEOUT, "Failed to get {}: timeout", description);
auto response = future.get();
if (response.error == Coordination::Error::ZNONODE)
{
LOG_TRACE(log, "Failed to get {}", description);
return std::nullopt;
}
else if (response.error != Coordination::Error::ZOK)
{
throw Exception(response.error, "Failed to get {}", description);
}
return std::move(response.data);
promise->set_value(response);
};
if (auto feature_flags = try_get(keeper_api_feature_flags_path, "feature flags"); feature_flags.has_value())
get(path, std::move(callback), {});
if (future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
throw Exception(Error::ZOPERATIONTIMEOUT, "Failed to get {}: timeout", description);
auto response = future.get();
if (response.error == Coordination::Error::ZNONODE)
{
LOG_TRACE(log, "Failed to get {}", description);
return std::nullopt;
}
else if (response.error != Coordination::Error::ZOK)
{
throw Exception(response.error, "Failed to get {}", description);
}
return std::move(response.data);
}
void ZooKeeper::initFeatureFlags()
{
if (auto feature_flags = tryGetSystemZnode(keeper_api_feature_flags_path, "feature flags"); feature_flags.has_value())
{
keeper_feature_flags.setFeatureFlags(std::move(*feature_flags));
return;
}
auto keeper_api_version_string = try_get(keeper_api_version_path, "API version");
auto keeper_api_version_string = tryGetSystemZnode(keeper_api_version_path, "API version");
DB::KeeperApiVersion keeper_api_version{DB::KeeperApiVersion::ZOOKEEPER_COMPATIBLE};
@ -1256,6 +1281,17 @@ void ZooKeeper::initFeatureFlags()
keeper_feature_flags.fromApiVersion(keeper_api_version);
}
String ZooKeeper::tryGetAvailabilityZone()
{
auto res = tryGetSystemZnode(keeper_availability_zone_path, "availability zone");
if (res)
{
LOG_TRACE(log, "Availability zone for ZooKeeper at {}: {}", getConnectedHostPort(), *res);
return *res;
}
return "";
}
void ZooKeeper::executeGenericRequest(
const ZooKeeperRequestPtr & request,
@ -1587,17 +1623,6 @@ void ZooKeeper::setupFaultDistributions()
inject_setup.test_and_set();
}
void ZooKeeper::checkSessionDeadline() const
{
if (unlikely(hasReachedDeadline()))
throw Exception::fromMessage(Error::ZSESSIONEXPIRED, "Session expired (force expiry client-side)");
}
bool ZooKeeper::hasReachedDeadline() const
{
return client_session_deadline.has_value() && clock::now() >= client_session_deadline.value();
}
void ZooKeeper::maybeInjectSendFault()
{
if (unlikely(inject_setup.test() && send_inject_fault && send_inject_fault.value()(thread_local_rng)))

View File

@ -8,6 +8,7 @@
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperArgs.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Coordination/KeeperConstants.h>
#include <Coordination/KeeperFeatureFlags.h>
@ -102,21 +103,12 @@ using namespace DB;
class ZooKeeper final : public IKeeper
{
public:
struct Node
{
Poco::Net::SocketAddress address;
UInt8 original_index;
bool secure;
};
using Nodes = std::vector<Node>;
/** Connection to nodes is performed in order. If you want, shuffle them manually.
* Operation timeout couldn't be greater than session timeout.
* Operation timeout applies independently for network read, network write, waiting for events and synchronization.
*/
ZooKeeper(
const Nodes & nodes,
const zkutil::ShuffleHosts & nodes,
const zkutil::ZooKeeperArgs & args_,
std::shared_ptr<ZooKeeperLog> zk_log_);
@ -130,9 +122,7 @@ public:
String getConnectedHostPort() const override { return (original_index == -1) ? "" : args.hosts[original_index]; }
int32_t getConnectionXid() const override { return next_xid.load(); }
/// A ZooKeeper session can have an optional deadline set on it.
/// After it has been reached, the session needs to be finalized.
bool hasReachedDeadline() const override;
String tryGetAvailabilityZone() override;
/// Useful to check owner of ephemeral node.
int64_t getSessionID() const override { return session_id; }
@ -271,7 +261,6 @@ private:
clock::time_point time;
};
std::optional<clock::time_point> client_session_deadline {};
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
RequestsQueue requests_queue{1024};
@ -316,7 +305,7 @@ private:
LoggerPtr log;
void connect(
const Nodes & node,
const zkutil::ShuffleHosts & node,
Poco::Timespan connection_timeout);
void sendHandshake();
@ -346,9 +335,10 @@ private:
void logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response = nullptr, bool finalize = false, UInt64 elapsed_microseconds = 0);
std::optional<String> tryGetSystemZnode(const std::string & path, const std::string & description);
void initFeatureFlags();
void checkSessionDeadline() const;
CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession};
std::shared_ptr<ZooKeeperLog> zk_log;

View File

@ -25,24 +25,24 @@ try
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace");
std::string hosts_arg = argv[1];
std::vector<std::string> hosts_strings;
splitInto<','>(hosts_strings, hosts_arg);
ZooKeeper::Nodes nodes;
nodes.reserve(hosts_strings.size());
for (size_t i = 0; i < hosts_strings.size(); ++i)
zkutil::ZooKeeperArgs args{argv[1]};
zkutil::ShuffleHosts nodes;
nodes.reserve(args.hosts.size());
for (size_t i = 0; i < args.hosts.size(); ++i)
{
std::string host_string = hosts_strings[i];
bool secure = startsWith(host_string, "secure://");
zkutil::ShuffleHost node;
std::string host_string = args.hosts[i];
node.secure = startsWith(host_string, "secure://");
if (secure)
if (node.secure)
host_string.erase(0, strlen("secure://"));
nodes.emplace_back(ZooKeeper::Node{Poco::Net::SocketAddress{host_string}, static_cast<UInt8>(i) , secure});
node.host = host_string;
node.original_index = i;
nodes.emplace_back(node);
}
zkutil::ZooKeeperArgs args;
ZooKeeper zk(nodes, args, nullptr);
Poco::Event event(true);

View File

@ -9,6 +9,21 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD;
}
namespace S3
{
std::string tryGetRunningAvailabilityZone()
{
try
{
return getRunningAvailabilityZone();
}
catch (...)
{
tryLogCurrentException("tryGetRunningAvailabilityZone");
return "";
}
}
}
}
#if USE_AWS_S3

View File

@ -24,6 +24,7 @@ static inline constexpr char GCP_METADATA_SERVICE_ENDPOINT[] = "http://metadata.
/// getRunningAvailabilityZone returns the availability zone of the underlying compute resources where the current process runs.
std::string getRunningAvailabilityZone();
std::string tryGetRunningAvailabilityZone();
class AWSEC2MetadataClient : public Aws::Internal::AWSHttpResourceClient
{
@ -195,6 +196,7 @@ namespace DB
namespace S3
{
std::string getRunningAvailabilityZone();
std::string tryGetRunningAvailabilityZone();
}
}

View File

@ -3402,8 +3402,6 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
const auto & config = shared->zookeeper_config ? *shared->zookeeper_config : getConfigRef();
if (!shared->zookeeper)
shared->zookeeper = zkutil::ZooKeeper::create(config, zkutil::getZooKeeperConfigName(config), getZooKeeperLog());
else if (shared->zookeeper->hasReachedDeadline())
shared->zookeeper->finalize("ZooKeeper session has reached its deadline");
if (shared->zookeeper->expired())
{

View File

@ -11,6 +11,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace PlacementInfo
{
@ -46,7 +51,15 @@ PlacementInfo & PlacementInfo::instance()
}
void PlacementInfo::initialize(const Poco::Util::AbstractConfiguration & config)
try
{
if (!config.has(DB::PlacementInfo::PLACEMENT_CONFIG_PREFIX))
{
availability_zone = "";
initialized = true;
return;
}
use_imds = config.getBool(getConfigPath("use_imds"), false);
if (use_imds)
@ -67,14 +80,17 @@ void PlacementInfo::initialize(const Poco::Util::AbstractConfiguration & config)
LOG_DEBUG(log, "Loaded info: availability_zone: {}", availability_zone);
initialized = true;
}
catch (...)
{
tryLogCurrentException("Failed to get availability zone");
availability_zone = "";
initialized = true;
}
std::string PlacementInfo::getAvailabilityZone() const
{
if (!initialized)
{
LOG_WARNING(log, "Placement info has not been loaded");
return "";
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Placement info has not been loaded");
return availability_zone;
}

View File

@ -36,7 +36,8 @@ ColumnsDescription StorageSystemZooKeeperConnection::getColumnsDescription()
/* 9 */ {"xid", std::make_shared<DataTypeInt32>(), "XID of the current session."},
/* 10*/ {"enabled_feature_flags", std::make_shared<DataTypeArray>(std::move(feature_flags_enum)),
"Feature flags which are enabled. Only applicable to ClickHouse Keeper."
}
},
/* 11*/ {"availability_zone", std::make_shared<DataTypeString>(), "Availability zone"},
};
}
@ -85,6 +86,7 @@ void StorageSystemZooKeeperConnection::fillData(MutableColumns & res_columns, Co
columns[8]->insert(zookeeper->getClientID());
columns[9]->insert(zookeeper->getConnectionXid());
add_enabled_feature_flags(zookeeper);
columns[11]->insert(zookeeper->getConnectedHostAvailabilityZone());
}
};

View File

@ -0,0 +1,35 @@
<clickhouse>
<zookeeper>
<!--<zookeeper_load_balancing> random / in_order / nearest_hostname / first_or_random / round_robin </zookeeper_load_balancing>-->
<zookeeper_load_balancing>random</zookeeper_load_balancing>
<prefer_local_availability_zone>1</prefer_local_availability_zone>
<fallback_session_lifetime>
<min>0</min>
<max>1</max>
</fallback_session_lifetime>
<node index="1">
<host>zoo1</host>
<port>2181</port>
<availability_zone>az1</availability_zone>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
<availability_zone>az2</availability_zone>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
<availability_zone>az3</availability_zone>
</node>
<session_timeout_ms>3000</session_timeout_ms>
</zookeeper>
<placement>
<use_imds>0</use_imds>
<availability_zone>az2</availability_zone>
</placement>
</clickhouse>

View File

@ -1,6 +1,8 @@
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(
__file__, zookeeper_config_path="configs/zookeeper_load_balancing.xml"
@ -17,6 +19,10 @@ node3 = cluster.add_instance(
"nod3", with_zookeeper=True, main_configs=["configs/zookeeper_load_balancing.xml"]
)
node4 = cluster.add_instance(
"nod4", with_zookeeper=True, main_configs=["configs/zookeeper_load_balancing2.xml"]
)
def change_balancing(old, new, reload=True):
line = "<zookeeper_load_balancing>{}<"
@ -405,113 +411,57 @@ def test_hostname_levenshtein_distance(started_cluster):
def test_round_robin(started_cluster):
pm = PartitionManager()
try:
pm._add_rule(
{
"source": node1.ip_address,
"destination": cluster.get_instance_ip("zoo1"),
"action": "REJECT --reject-with tcp-reset",
}
)
pm._add_rule(
{
"source": node2.ip_address,
"destination": cluster.get_instance_ip("zoo1"),
"action": "REJECT --reject-with tcp-reset",
}
)
pm._add_rule(
{
"source": node3.ip_address,
"destination": cluster.get_instance_ip("zoo1"),
"action": "REJECT --reject-with tcp-reset",
}
)
change_balancing("random", "round_robin")
print(
str(
node1.exec_in_container(
[
"bash",
"-c",
"lsof -a -i4 -i6 -itcp -w | grep ':2181' | grep ESTABLISHED",
],
privileged=True,
user="root",
)
for node in [node1, node2, node3]:
idx = int(
node.query("select index from system.zookeeper_connection").strip()
)
)
assert (
"1"
== str(
node1.exec_in_container(
[
"bash",
"-c",
"lsof -a -i4 -i6 -itcp -w | grep 'testzookeeperconfigloadbalancing_zoo2_1.*testzookeeperconfigloadbalancing_default:2181' | grep ESTABLISHED | wc -l",
],
privileged=True,
user="root",
)
).strip()
)
new_idx = (idx + 1) % 3
print(
str(
node2.exec_in_container(
[
"bash",
"-c",
"lsof -a -i4 -i6 -itcp -w | grep ':2181' | grep ESTABLISHED",
],
privileged=True,
user="root",
)
pm._add_rule(
{
"source": node.ip_address,
"destination": cluster.get_instance_ip("zoo" + str(idx + 1)),
"action": "REJECT --reject-with tcp-reset",
}
)
)
assert (
"1"
== str(
node2.exec_in_container(
[
"bash",
"-c",
"lsof -a -i4 -i6 -itcp -w | grep 'testzookeeperconfigloadbalancing_zoo2_1.*testzookeeperconfigloadbalancing_default:2181' | grep ESTABLISHED | wc -l",
],
privileged=True,
user="root",
)
).strip()
)
print(
str(
node3.exec_in_container(
[
"bash",
"-c",
"lsof -a -i4 -i6 -itcp -w | grep ':2181' | grep ESTABLISHED",
],
privileged=True,
user="root",
)
assert_eq_with_retry(
node,
"select index from system.zookeeper_connection",
str(new_idx) + "\n",
)
)
assert (
"1"
== str(
node3.exec_in_container(
[
"bash",
"-c",
"lsof -a -i4 -i6 -itcp -w | grep 'testzookeeperconfigloadbalancing_zoo2_1.*testzookeeperconfigloadbalancing_default:2181' | grep ESTABLISHED | wc -l",
],
privileged=True,
user="root",
)
).strip()
)
pm.heal_all()
finally:
pm.heal_all()
change_balancing("round_robin", "random", reload=False)
def test_az(started_cluster):
pm = PartitionManager()
try:
# make sure it disconnects from the optimal node
pm._add_rule(
{
"source": node4.ip_address,
"destination": cluster.get_instance_ip("zoo2"),
"action": "REJECT --reject-with tcp-reset",
}
)
node4.query_with_retry("select * from system.zookeeper where path='/'")
assert "az2\n" != node4.query(
"select availability_zone from system.zookeeper_connection"
)
# fallback_session_lifetime.max is 1 second, but it shouldn't drop current session until the node becomes available
time.sleep(5) # this is fine
assert 5 <= int(node4.query("select zookeeperSessionUptime()").strip())
pm.heal_all()
assert_eq_with_retry(
node4, "select availability_zone from system.zookeeper_connection", "az2\n"
)
finally:
pm.heal_all()

View File

@ -84,10 +84,28 @@ def test_fallback_session(started_cluster: ClickHouseCluster):
)
# at this point network partitioning has been reverted.
# the nodes should switch to zoo1 automatically because of `in_order` load-balancing.
# the nodes should switch to zoo1 because of `in_order` load-balancing.
# otherwise they would connect to a random replica
# but there's no reason to reconnect because current session works
# and there's no "optimal" node with `in_order` load-balancing
# so we need to break the current session
for node in [node1, node2, node3]:
assert_uses_zk_node(node, "zoo1")
assert_uses_zk_node(node, "zoo3")
with PartitionManager() as pm:
for node in started_cluster.instances.values():
pm._add_rule(
{
"source": node.ip_address,
"destination": cluster.get_instance_ip("zoo3"),
"action": "REJECT --reject-with tcp-reset",
}
)
for node in [node1, node2, node3]:
assert_uses_zk_node(node, "zoo1")
node1.query_with_retry("INSERT INTO simple VALUES ({0}, {0})".format(2))
for node in [node2, node3]:

View File

@ -1238,9 +1238,13 @@ void Runner::createConnections()
std::shared_ptr<Coordination::ZooKeeper> Runner::getConnection(const ConnectionInfo & connection_info, size_t connection_info_idx)
{
Coordination::ZooKeeper::Node node{Poco::Net::SocketAddress{connection_info.host}, static_cast<UInt8>(connection_info_idx), connection_info.secure};
std::vector<Coordination::ZooKeeper::Node> nodes;
nodes.push_back(node);
zkutil::ShuffleHost host;
host.host = connection_info.host;
host.secure = connection_info.secure;
host.original_index = static_cast<UInt8>(connection_info_idx);
host.address = Poco::Net::SocketAddress{connection_info.host};
zkutil::ShuffleHosts nodes{host};
zkutil::ZooKeeperArgs args;
args.session_timeout_ms = connection_info.session_timeout_ms;
args.connection_timeout_ms = connection_info.connection_timeout_ms;