improve ZooKeeper load balancing

This commit is contained in:
Alexander Tokmakov 2024-06-23 01:58:28 +02:00
parent 180ada0622
commit dbdf4e1880
16 changed files with 380 additions and 123 deletions

View File

@ -60,4 +60,23 @@ GetPriorityForLoadBalancing::getPriorityFunc(LoadBalancing load_balance, size_t
return get_priority;
}
bool GetPriorityForLoadBalancing::hasOptimalNode() const
{
switch (load_balancing)
{
case LoadBalancing::NEAREST_HOSTNAME:
return true;
case LoadBalancing::HOSTNAME_LEVENSHTEIN_DISTANCE:
return true;
case LoadBalancing::IN_ORDER:
return false;
case LoadBalancing::RANDOM:
return false;
case LoadBalancing::FIRST_OR_RANDOM:
return true;
case LoadBalancing::ROUND_ROBIN:
return false;
}
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/SettingsEnums.h>
#include <Common/thread_local_rng.h>
namespace DB
{
@ -13,6 +14,7 @@ public:
explicit GetPriorityForLoadBalancing(LoadBalancing load_balancing_, size_t last_used_ = 0)
: load_balancing(load_balancing_), last_used(last_used_)
{
saved_offset = thread_local_rng();
}
GetPriorityForLoadBalancing() = default;
@ -29,6 +31,12 @@ public:
}
Func getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const;
Func getPriorityFunc(size_t pool_size) const
{
return getPriorityFunc(load_balancing, saved_offset % pool_size, pool_size);
}
bool hasOptimalNode() const;
std::vector<size_t> hostname_prefix_distance; /// Prefix distances from name of this host to the names of hosts of pools.
std::vector<size_t> hostname_levenshtein_distance; /// Levenshtein Distances from name of this host to the names of hosts of pools.
@ -37,6 +45,7 @@ public:
private:
mutable size_t last_used = 0; /// Last used for round_robin policy.
size_t saved_offset; /// Default random offset for round_robin policy.
};
}

View File

@ -559,6 +559,8 @@ public:
/// Useful to check owner of ephemeral node.
virtual int64_t getSessionID() const = 0;
virtual String tryGetAvailabilityZone() { return ""; }
/// If the method will throw an exception, callbacks won't be called.
///
/// After the method is executed successfully, you must wait for callbacks
@ -635,10 +637,6 @@ public:
virtual const DB::KeeperFeatureFlags * getKeeperFeatureFlags() const { return nullptr; }
/// A ZooKeeper session can have an optional deadline set on it.
/// After it has been reached, the session needs to be finalized.
virtual bool hasReachedDeadline() const = 0;
/// Expire session and finish all pending requests
virtual void finalize(const String & reason) = 0;
};

View File

@ -39,7 +39,6 @@ public:
~TestKeeper() override;
bool isExpired() const override { return expired; }
bool hasReachedDeadline() const override { return false; }
Int8 getConnectedNodeIdx() const override { return 0; }
String getConnectedHostPort() const override { return "TestKeeper:0000"; }
int32_t getConnectionXid() const override { return 0; }

View File

@ -8,6 +8,7 @@
#include <functional>
#include <ranges>
#include <vector>
#include <chrono>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
@ -16,10 +17,12 @@
#include <base/sort.h>
#include <base/getFQDNOrHostName.h>
#include <Core/ServerUUID.h>
#include <Core/BackgroundSchedulePool.h>
#include "Common/ZooKeeper/IKeeper.h"
#include <Common/DNSResolver.h>
#include <Common/StringUtils.h>
#include <Common/Exception.h>
#include <Interpreters/Context.h>
#include <Poco/Net/NetException.h>
#include <Poco/Net/DNS.h>
@ -55,70 +58,174 @@ static void check(Coordination::Error code, const std::string & path)
throw KeeperException::fromPath(code, path);
}
UInt64 getSecondsUntilReconnect(const ZooKeeperArgs & args)
{
std::uniform_int_distribution<UInt32> fallback_session_lifetime_distribution
{
args.fallback_session_lifetime.min_sec,
args.fallback_session_lifetime.max_sec,
};
UInt32 session_lifetime_seconds = fallback_session_lifetime_distribution(thread_local_rng);
return session_lifetime_seconds;
}
void ZooKeeper::init(ZooKeeperArgs args_)
Coordination::ZooKeeper::Node hostToNode(const LoggerPtr & log, const ShuffleHost & host)
{
/// We want to resolve all hosts without DNS cache for keeper connection.
Coordination::DNSResolver::instance().removeHostFromCache(host.host);
const Poco::Net::SocketAddress host_socket_addr{host.host};
LOG_TEST(log, "Adding ZooKeeper host {} ({})", host.host, host_socket_addr.toString());
return Coordination::ZooKeeper::Node{host_socket_addr, host.original_index, host.secure};
}
/// TODO get rid of this, converting "host" to "node" is stupid
Coordination::ZooKeeper::Nodes hostsToNodes(const LoggerPtr & log, std::vector<ShuffleHost> & shuffled_hosts)
{
Coordination::ZooKeeper::Nodes nodes;
bool dns_error = false;
for (auto & host : shuffled_hosts)
{
auto & host_string = host.host;
try
{
host.secure = startsWith(host_string, "secure://");
if (host.secure)
host_string.erase(0, strlen("secure://"));
nodes.emplace_back(hostToNode(log, host));
}
catch (const Poco::Net::HostNotFoundException & e)
{
/// Most likely it's misconfiguration and wrong hostname was specified
LOG_ERROR(log, "Cannot use ZooKeeper host {}, reason: {}", host_string, e.displayText());
}
catch (const Poco::Net::DNSException & e)
{
/// Most likely DNS is not available now
dns_error = true;
LOG_ERROR(log, "Cannot use ZooKeeper host {} due to DNS error: {}", host_string, e.displayText());
}
}
if (nodes.empty())
{
/// For DNS errors we throw exception with ZCONNECTIONLOSS code, so it will be considered as hardware error, not user error
if (dns_error)
throw KeeperException::fromMessage(
Coordination::Error::ZCONNECTIONLOSS, "Cannot resolve any of provided ZooKeeper hosts due to DNS error");
else
throw KeeperException::fromMessage(Coordination::Error::ZCONNECTIONLOSS, "Cannot use any of provided ZooKeeper nodes");
}
return nodes;
}
void ZooKeeper::updateAvailabilityZones()
{
std::vector<ShuffleHost> shuffled_hosts = shuffleHosts();
Coordination::ZooKeeper::Nodes nodes = hostsToNodes(log, shuffled_hosts);
for (const auto & node : nodes)
{
try
{
Coordination::ZooKeeper::Nodes single_node{node};
auto tmp_impl = std::make_unique<Coordination::ZooKeeper>(single_node, args, zk_log);
auto idx = node.original_index;
availability_zones[idx] = tmp_impl->tryGetAvailabilityZone();
LOG_DEBUG(log, "Got availability zone for {}: {}", args.hosts[idx], availability_zones[idx]);
}
catch (...)
{
DB::tryLogCurrentException(log, "Failed to get availability zone for " + node.address.toString());
}
}
}
void ZooKeeper::init(ZooKeeperArgs args_, std::unique_ptr<Coordination::IKeeper> existing_impl)
{
args = std::move(args_);
log = getLogger("ZooKeeper");
if (args.implementation == "zookeeper")
if (existing_impl)
{
chassert(args.implementation == "zookeeper");
impl = std::move(existing_impl);
LOG_INFO(log, "Switching to connection to a more optimal node {}", impl->getConnectedHostPort());
}
else if (args.implementation == "zookeeper")
{
if (args.hosts.empty())
throw KeeperException::fromMessage(Coordination::Error::ZBADARGUMENTS, "No hosts passed to ZooKeeper constructor.");
Coordination::ZooKeeper::Nodes nodes;
nodes.reserve(args.hosts.size());
chassert(args.availability_zones.size() == args.hosts.size());
if (availability_zones.empty())
{
/// availability_zones is empty on server startup or after config reloading
/// We will keep the az info when starting new sessions
availability_zones = args.availability_zones;
if (args.availability_zone_autodetect)
updateAvailabilityZones();
}
/// Shuffle the hosts to distribute the load among ZooKeeper nodes.
std::vector<ShuffleHost> shuffled_hosts = shuffleHosts();
bool dns_error = false;
for (auto & host : shuffled_hosts)
{
auto & host_string = host.host;
try
{
const bool secure = startsWith(host_string, "secure://");
if (secure)
host_string.erase(0, strlen("secure://"));
/// We want to resolve all hosts without DNS cache for keeper connection.
Coordination::DNSResolver::instance().removeHostFromCache(host_string);
const Poco::Net::SocketAddress host_socket_addr{host_string};
LOG_TEST(log, "Adding ZooKeeper host {} ({})", host_string, host_socket_addr.toString());
nodes.emplace_back(Coordination::ZooKeeper::Node{host_socket_addr, host.original_index, secure});
}
catch (const Poco::Net::HostNotFoundException & e)
{
/// Most likely it's misconfiguration and wrong hostname was specified
LOG_ERROR(log, "Cannot use ZooKeeper host {}, reason: {}", host_string, e.displayText());
}
catch (const Poco::Net::DNSException & e)
{
/// Most likely DNS is not available now
dns_error = true;
LOG_ERROR(log, "Cannot use ZooKeeper host {} due to DNS error: {}", host_string, e.displayText());
}
}
if (nodes.empty())
{
/// For DNS errors we throw exception with ZCONNECTIONLOSS code, so it will be considered as hardware error, not user error
if (dns_error)
throw KeeperException::fromMessage(Coordination::Error::ZCONNECTIONLOSS, "Cannot resolve any of provided ZooKeeper hosts due to DNS error");
else
throw KeeperException::fromMessage(Coordination::Error::ZCONNECTIONLOSS, "Cannot use any of provided ZooKeeper nodes");
}
Coordination::ZooKeeper::Nodes nodes = hostsToNodes(log, shuffled_hosts);
impl = std::make_unique<Coordination::ZooKeeper>(nodes, args, zk_log);
Int8 node_idx = impl->getConnectedNodeIdx();
if (args.chroot.empty())
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(args.hosts, ","));
else
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), args.chroot);
/// If the balancing strategy has an optimal node then it will be the first in the list
bool connected_to_suboptimal_node = node_idx != shuffled_hosts[0].original_index;
bool respect_az = !args.client_availability_zone.empty();
bool may_benefit_from_reconnecting = respect_az || args.get_priority_load_balancing.hasOptimalNode();
if (connected_to_suboptimal_node && may_benefit_from_reconnecting)
{
auto reconnect_timeout_sec = getSecondsUntilReconnect(args);
LOG_DEBUG(log, "Connected to a suboptimal ZooKeeper host ({}, index {})."
" To preserve balance in ZooKeeper usage, this ZooKeeper session will expire in {} seconds",
impl->getConnectedHostPort(), node_idx, reconnect_timeout_sec);
auto reconnect_task_holder = DB::Context::getGlobalContextInstance()->getSchedulePool().createTask("ZKReconnect", [this, optimal_host = shuffled_hosts[0]]()
{
try
{
LOG_DEBUG(log, "Trying to connect to a more optimal node {}", optimal_host.host);
Coordination::ZooKeeper::Nodes node;
node.emplace_back(hostToNode(log, optimal_host));
std::unique_ptr<Coordination::IKeeper> new_impl = std::make_unique<Coordination::ZooKeeper>(node, args, zk_log);
Int8 new_node_idx = new_impl->getConnectedNodeIdx();
/// Maybe the node was unavailable when getting AZs first time, update just in case
if (args.availability_zone_autodetect)
{
availability_zones[new_node_idx] = new_impl->tryGetAvailabilityZone();
LOG_DEBUG(log, "Got availability zone for {}: {}", optimal_host.host, availability_zones[new_node_idx]);
}
optimal_impl = std::move(new_impl);
impl->finalize("Connected to a more optimal node");
}
catch (...)
{
LOG_WARNING(log, "Failed to connect to a more optimal ZooKeeper, will try again later: {}", DB::getCurrentExceptionMessage(/*with_stacktrace*/ false));
(*reconnect_task)->scheduleAfter(getSecondsUntilReconnect(args) * 1000);
}
});
reconnect_task = std::make_unique<DB::BackgroundSchedulePoolTaskHolder>(std::move(reconnect_task_holder));
(*reconnect_task)->activate();
(*reconnect_task)->scheduleAfter(reconnect_timeout_sec * 1000);
}
}
else if (args.implementation == "testkeeper")
{
@ -152,29 +259,47 @@ void ZooKeeper::init(ZooKeeperArgs args_)
}
}
ZooKeeper::~ZooKeeper()
{
if (reconnect_task)
(*reconnect_task)->deactivate();
}
ZooKeeper::ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
: zk_log(std::move(zk_log_))
{
init(args_);
init(args_, /*existing_impl*/ {});
}
ZooKeeper::ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_, Strings availability_zones_, std::unique_ptr<Coordination::IKeeper> existing_impl)
: availability_zones(std::move(availability_zones_)), zk_log(std::move(zk_log_))
{
if (availability_zones.size() != args_.hosts.size())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Argument sizes mismatch: {} and {}", availability_zones.size(), args_.hosts.size());
init(args_, std::move(existing_impl));
}
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
: zk_log(std::move(zk_log_))
{
init(ZooKeeperArgs(config, config_name));
init(ZooKeeperArgs(config, config_name), /*existing_impl*/ {});
}
std::vector<ShuffleHost> ZooKeeper::shuffleHosts() const
{
std::function<Priority(size_t index)> get_priority = args.get_priority_load_balancing.getPriorityFunc(args.get_priority_load_balancing.load_balancing, 0, args.hosts.size());
std::function<Priority(size_t index)> get_priority = args.get_priority_load_balancing.getPriorityFunc(args.hosts.size());
std::vector<ShuffleHost> shuffle_hosts;
for (size_t i = 0; i < args.hosts.size(); ++i)
{
ShuffleHost shuffle_host;
shuffle_host.host = args.hosts[i];
shuffle_host.original_index = static_cast<UInt8>(i);
if (!availability_zones[i].empty())
shuffle_host.az_info = availability_zones[i] == args.client_availability_zone ? ShuffleHost::SAME : ShuffleHost::OTHER;
if (get_priority)
shuffle_host.priority = get_priority(i);
shuffle_host.randomize();
@ -1023,7 +1148,10 @@ ZooKeeperPtr ZooKeeper::create(const Poco::Util::AbstractConfiguration & config,
ZooKeeperPtr ZooKeeper::startNewSession() const
{
auto res = std::shared_ptr<ZooKeeper>(new ZooKeeper(args, zk_log));
if (reconnect_task)
(*reconnect_task)->deactivate();
auto res = std::shared_ptr<ZooKeeper>(new ZooKeeper(args, zk_log, availability_zones, std::move(optimal_impl)));
res->initSession();
return res;
}
@ -1456,6 +1584,11 @@ int32_t ZooKeeper::getConnectionXid() const
return impl->getConnectionXid();
}
String ZooKeeper::getConnectedHostAvailabilityZone() const
{
return availability_zones.at(impl->getConnectedNodeIdx());
}
size_t getFailedOpIndex(Coordination::Error exception_code, const Coordination::Responses & responses)
{
if (responses.empty())

View File

@ -32,6 +32,7 @@ namespace DB
{
class ZooKeeperLog;
class ZooKeeperWithFaultInjection;
class BackgroundSchedulePoolTaskHolder;
namespace ErrorCodes
{
@ -48,8 +49,17 @@ constexpr size_t MULTI_BATCH_SIZE = 100;
struct ShuffleHost
{
enum AvailabilityZoneInfo
{
SAME = 0,
UNKNOWN = 1,
OTHER = 2,
};
String host;
bool secure = false;
UInt8 original_index = 0;
AvailabilityZoneInfo az_info = UNKNOWN;
Priority priority;
UInt64 random = 0;
@ -60,8 +70,8 @@ struct ShuffleHost
static bool compare(const ShuffleHost & lhs, const ShuffleHost & rhs)
{
return std::forward_as_tuple(lhs.priority, lhs.random)
< std::forward_as_tuple(rhs.priority, rhs.random);
return std::forward_as_tuple(lhs.az_info, lhs.priority, lhs.random)
< std::forward_as_tuple(lhs.az_info, rhs.priority, rhs.random);
}
};
@ -197,6 +207,9 @@ class ZooKeeper
explicit ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr);
/// Allows to keep info about availability zones when starting a new session
ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_, Strings availability_zones_, std::unique_ptr<Coordination::IKeeper> existing_impl);
/** Config of the form:
<zookeeper>
<node>
@ -228,6 +241,8 @@ public:
using Ptr = std::shared_ptr<ZooKeeper>;
using ErrorsList = std::initializer_list<Coordination::Error>;
~ZooKeeper();
std::vector<ShuffleHost> shuffleHosts() const;
static Ptr create(const Poco::Util::AbstractConfiguration & config,
@ -596,8 +611,6 @@ public:
UInt32 getSessionUptime() const { return static_cast<UInt32>(session_uptime.elapsedSeconds()); }
bool hasReachedDeadline() const { return impl->hasReachedDeadline(); }
uint64_t getSessionTimeoutMS() const { return args.session_timeout_ms; }
void setServerCompletelyStarted();
@ -606,6 +619,8 @@ public:
String getConnectedHostPort() const;
int32_t getConnectionXid() const;
String getConnectedHostAvailabilityZone() const;
const DB::KeeperFeatureFlags * getKeeperFeatureFlags() const { return impl->getKeeperFeatureFlags(); }
/// Checks that our session was not killed, and allows to avoid applying a request from an old lost session.
@ -625,7 +640,8 @@ public:
void addCheckSessionOp(Coordination::Requests & requests) const;
private:
void init(ZooKeeperArgs args_);
void init(ZooKeeperArgs args_, std::unique_ptr<Coordination::IKeeper> existing_impl);
void updateAvailabilityZones();
/// The following methods don't any throw exceptions but return error codes.
Coordination::Error createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created);
@ -690,15 +706,20 @@ private:
}
std::unique_ptr<Coordination::IKeeper> impl;
mutable std::unique_ptr<Coordination::IKeeper> optimal_impl;
ZooKeeperArgs args;
Strings availability_zones;
LoggerPtr log = nullptr;
std::shared_ptr<DB::ZooKeeperLog> zk_log;
AtomicStopwatch session_uptime;
int32_t session_node_version;
std::unique_ptr<DB::BackgroundSchedulePoolTaskHolder> reconnect_task;
};

View File

@ -5,6 +5,7 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/isLocalAddress.h>
#include <Common/StringUtils.h>
#include <IO/S3/Credentials.h>
#include <Poco/String.h>
namespace DB
@ -144,6 +145,7 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio
hosts.push_back(
(config.getBool(config_name + "." + key + ".secure", false) ? "secure://" : "")
+ config.getString(config_name + "." + key + ".host") + ":" + config.getString(config_name + "." + key + ".port", "2181"));
availability_zones.push_back(config.getString(config_name + "." + key + ".availability_zone", ""));
}
else if (key == "session_timeout_ms")
{
@ -199,6 +201,10 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio
{
sessions_path = config.getString(config_name + "." + key);
}
else if (key == "client_availability_zone")
{
client_availability_zone = config.getString(config_name + "." + key);
}
else if (key == "implementation")
{
implementation = config.getString(config_name + "." + key);
@ -224,9 +230,16 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio
{
use_compression = config.getBool(config_name + "." + key);
}
else if (key == "availability_zone_autodetect")
{
availability_zone_autodetect = config.getBool(config_name + "." + key);
}
else
throw KeeperException(Coordination::Error::ZBADARGUMENTS, "Unknown key {} in config file", key);
}
if (availability_zone_autodetect)
client_availability_zone = DB::S3::tryGetRunningAvailabilityZone();
}
}

View File

@ -32,10 +32,12 @@ struct ZooKeeperArgs
String zookeeper_name = "zookeeper";
String implementation = "zookeeper";
Strings hosts;
Strings availability_zones;
String auth_scheme;
String identity;
String chroot;
String sessions_path = "/clickhouse/sessions";
String client_availability_zone;
int32_t connection_timeout_ms = Coordination::DEFAULT_CONNECTION_TIMEOUT_MS;
int32_t session_timeout_ms = Coordination::DEFAULT_SESSION_TIMEOUT_MS;
int32_t operation_timeout_ms = Coordination::DEFAULT_OPERATION_TIMEOUT_MS;
@ -47,6 +49,7 @@ struct ZooKeeperArgs
UInt64 send_sleep_ms = 0;
UInt64 recv_sleep_ms = 0;
bool use_compression = false;
bool availability_zone_autodetect = false;
SessionLifetimeConfiguration fallback_session_lifetime = {};
DB::GetPriorityForLoadBalancing get_priority_load_balancing;

View File

@ -498,22 +498,6 @@ void ZooKeeper::connect(
}
original_index = static_cast<Int8>(node.original_index);
if (i != 0)
{
std::uniform_int_distribution<UInt32> fallback_session_lifetime_distribution
{
args.fallback_session_lifetime.min_sec,
args.fallback_session_lifetime.max_sec,
};
UInt32 session_lifetime_seconds = fallback_session_lifetime_distribution(thread_local_rng);
client_session_deadline = clock::now() + std::chrono::seconds(session_lifetime_seconds);
LOG_DEBUG(log, "Connected to a suboptimal ZooKeeper host ({}, index {})."
" To preserve balance in ZooKeeper usage, this ZooKeeper session will expire in {} seconds",
node.address.toString(), i, session_lifetime_seconds);
}
break;
}
catch (...)
@ -1153,7 +1137,6 @@ void ZooKeeper::pushRequest(RequestInfo && info)
{
try
{
checkSessionDeadline();
info.time = clock::now();
auto maybe_zk_log = std::atomic_load(&zk_log);
if (maybe_zk_log)
@ -1201,44 +1184,44 @@ bool ZooKeeper::isFeatureEnabled(KeeperFeatureFlag feature_flag) const
return keeper_feature_flags.isEnabled(feature_flag);
}
void ZooKeeper::initFeatureFlags()
std::optional<String> ZooKeeper::tryGetSystemZnode(const std::string & path, const std::string & description)
{
const auto try_get = [&](const std::string & path, const std::string & description) -> std::optional<std::string>
auto promise = std::make_shared<std::promise<Coordination::GetResponse>>();
auto future = promise->get_future();
auto callback = [promise](const Coordination::GetResponse & response) mutable
{
auto promise = std::make_shared<std::promise<Coordination::GetResponse>>();
auto future = promise->get_future();
auto callback = [promise](const Coordination::GetResponse & response) mutable
{
promise->set_value(response);
};
get(path, std::move(callback), {});
if (future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
throw Exception(Error::ZOPERATIONTIMEOUT, "Failed to get {}: timeout", description);
auto response = future.get();
if (response.error == Coordination::Error::ZNONODE)
{
LOG_TRACE(log, "Failed to get {}", description);
return std::nullopt;
}
else if (response.error != Coordination::Error::ZOK)
{
throw Exception(response.error, "Failed to get {}", description);
}
return std::move(response.data);
promise->set_value(response);
};
if (auto feature_flags = try_get(keeper_api_feature_flags_path, "feature flags"); feature_flags.has_value())
get(path, std::move(callback), {});
if (future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
throw Exception(Error::ZOPERATIONTIMEOUT, "Failed to get {}: timeout", description);
auto response = future.get();
if (response.error == Coordination::Error::ZNONODE)
{
LOG_TRACE(log, "Failed to get {}", description);
return std::nullopt;
}
else if (response.error != Coordination::Error::ZOK)
{
throw Exception(response.error, "Failed to get {}", description);
}
return std::move(response.data);
}
void ZooKeeper::initFeatureFlags()
{
if (auto feature_flags = tryGetSystemZnode(keeper_api_feature_flags_path, "feature flags"); feature_flags.has_value())
{
keeper_feature_flags.setFeatureFlags(std::move(*feature_flags));
return;
}
auto keeper_api_version_string = try_get(keeper_api_version_path, "API version");
auto keeper_api_version_string = tryGetSystemZnode(keeper_api_version_path, "API version");
DB::KeeperApiVersion keeper_api_version{DB::KeeperApiVersion::ZOOKEEPER_COMPATIBLE};
@ -1256,6 +1239,17 @@ void ZooKeeper::initFeatureFlags()
keeper_feature_flags.fromApiVersion(keeper_api_version);
}
String ZooKeeper::tryGetAvailabilityZone()
{
auto res = tryGetSystemZnode(keeper_availability_zone_path, "availability zone");
if (res)
{
LOG_TRACE(log, "Availability zone for ZooKeeper at {}: {}", getConnectedHostPort(), *res);
return *res;
}
return "";
}
void ZooKeeper::executeGenericRequest(
const ZooKeeperRequestPtr & request,
@ -1587,17 +1581,6 @@ void ZooKeeper::setupFaultDistributions()
inject_setup.test_and_set();
}
void ZooKeeper::checkSessionDeadline() const
{
if (unlikely(hasReachedDeadline()))
throw Exception::fromMessage(Error::ZSESSIONEXPIRED, "Session expired (force expiry client-side)");
}
bool ZooKeeper::hasReachedDeadline() const
{
return client_session_deadline.has_value() && clock::now() >= client_session_deadline.value();
}
void ZooKeeper::maybeInjectSendFault()
{
if (unlikely(inject_setup.test() && send_inject_fault && send_inject_fault.value()(thread_local_rng)))

View File

@ -130,9 +130,7 @@ public:
String getConnectedHostPort() const override { return (original_index == -1) ? "" : args.hosts[original_index]; }
int32_t getConnectionXid() const override { return next_xid.load(); }
/// A ZooKeeper session can have an optional deadline set on it.
/// After it has been reached, the session needs to be finalized.
bool hasReachedDeadline() const override;
String tryGetAvailabilityZone() override;
/// Useful to check owner of ephemeral node.
int64_t getSessionID() const override { return session_id; }
@ -271,7 +269,6 @@ private:
clock::time_point time;
};
std::optional<clock::time_point> client_session_deadline {};
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
RequestsQueue requests_queue{1024};
@ -346,9 +343,10 @@ private:
void logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response = nullptr, bool finalize = false, UInt64 elapsed_microseconds = 0);
std::optional<String> tryGetSystemZnode(const std::string & path, const std::string & description);
void initFeatureFlags();
void checkSessionDeadline() const;
CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession};
std::shared_ptr<ZooKeeperLog> zk_log;

View File

@ -9,6 +9,21 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD;
}
namespace S3
{
std::string tryGetRunningAvailabilityZone()
{
try
{
return getRunningAvailabilityZone();
}
catch (...)
{
tryLogCurrentException("tryGetRunningAvailabilityZone");
return "";
}
}
}
}
#if USE_AWS_S3

View File

@ -24,6 +24,7 @@ static inline constexpr char GCP_METADATA_SERVICE_ENDPOINT[] = "http://metadata.
/// getRunningAvailabilityZone returns the availability zone of the underlying compute resources where the current process runs.
std::string getRunningAvailabilityZone();
std::string tryGetRunningAvailabilityZone();
class AWSEC2MetadataClient : public Aws::Internal::AWSHttpResourceClient
{

View File

@ -3396,8 +3396,6 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
const auto & config = shared->zookeeper_config ? *shared->zookeeper_config : getConfigRef();
if (!shared->zookeeper)
shared->zookeeper = zkutil::ZooKeeper::create(config, zkutil::getZooKeeperConfigName(config), getZooKeeperLog());
else if (shared->zookeeper->hasReachedDeadline())
shared->zookeeper->finalize("ZooKeeper session has reached its deadline");
if (shared->zookeeper->expired())
{

View File

@ -36,7 +36,8 @@ ColumnsDescription StorageSystemZooKeeperConnection::getColumnsDescription()
/* 9 */ {"xid", std::make_shared<DataTypeInt32>(), "XID of the current session."},
/* 10*/ {"enabled_feature_flags", std::make_shared<DataTypeArray>(std::move(feature_flags_enum)),
"Feature flags which are enabled. Only applicable to ClickHouse Keeper."
}
},
/* 11*/ {"availability_zone", std::make_shared<DataTypeString>(), "Availability zone"},
};
}
@ -85,6 +86,7 @@ void StorageSystemZooKeeperConnection::fillData(MutableColumns & res_columns, Co
columns[8]->insert(zookeeper->getClientID());
columns[9]->insert(zookeeper->getConnectionXid());
add_enabled_feature_flags(zookeeper);
columns[11]->insert(zookeeper->getConnectedHostAvailabilityZone());
}
};

View File

@ -0,0 +1,29 @@
<clickhouse>
<zookeeper>
<!--<zookeeper_load_balancing> random / in_order / nearest_hostname / first_or_random / round_robin </zookeeper_load_balancing>-->
<zookeeper_load_balancing>random</zookeeper_load_balancing>
<client_availability_zone>az2</client_availability_zone>
<fallback_session_lifetime>
<min>0</min>
<max>1</max>
</fallback_session_lifetime>
<node index="1">
<host>zoo1</host>
<port>2181</port>
<availability_zone>az1</availability_zone>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
<availability_zone>az2</availability_zone>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
<availability_zone>az3</availability_zone>
</node>
<session_timeout_ms>3000</session_timeout_ms>
</zookeeper>
</clickhouse>

View File

@ -1,6 +1,8 @@
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(
__file__, zookeeper_config_path="configs/zookeeper_load_balancing.xml"
@ -17,6 +19,10 @@ node3 = cluster.add_instance(
"nod3", with_zookeeper=True, main_configs=["configs/zookeeper_load_balancing.xml"]
)
node4 = cluster.add_instance(
"nod4", with_zookeeper=True, main_configs=["configs/zookeeper_load_balancing2.xml"]
)
def change_balancing(old, new, reload=True):
line = "<zookeeper_load_balancing>{}<"
@ -515,3 +521,33 @@ def test_round_robin(started_cluster):
finally:
pm.heal_all()
change_balancing("round_robin", "random", reload=False)
def test_az(started_cluster):
pm = PartitionManager()
try:
# make sure it disconnects from the optimal node
pm._add_rule(
{
"source": node1.ip_address,
"destination": cluster.get_instance_ip("zoo2"),
"action": "REJECT --reject-with tcp-reset",
}
)
node4.query_with_retry("select * from system.zookeeper where path='/'")
assert "az2\n" != node4.query(
"select availability_zone from system.zookeeper_connection"
)
# fallback_session_lifetime.max is 1 second, but it shouldn't drop current session until the node becomes available
time.sleep(5) # this is fine
assert 5 >= int(node4.query("select zookeeperSessionUptime()").strip())
pm.heal_all()
assert_eq_with_retry(
node4, "select availability_zone from system.zookeeper_connection", "az2\n"
)
finally:
pm.heal_all()