diff --git a/programs/keeper-client/KeeperClient.cpp b/programs/keeper-client/KeeperClient.cpp index 68adc2c2aac..a20c1f686f3 100644 --- a/programs/keeper-client/KeeperClient.cpp +++ b/programs/keeper-client/KeeperClient.cpp @@ -383,6 +383,9 @@ int KeeperClient::main(const std::vector & /* args */) for (const auto & key : keys) { + if (key != "node") + continue; + String prefix = "zookeeper." + key; String host = clickhouse_config.configuration->getString(prefix + ".host"); String port = clickhouse_config.configuration->getString(prefix + ".port"); @@ -401,6 +404,7 @@ int KeeperClient::main(const std::vector & /* args */) zk_args.hosts.push_back(host + ":" + port); } + zk_args.availability_zones.resize(zk_args.hosts.size()); zk_args.connection_timeout_ms = config().getInt("connection-timeout", 10) * 1000; zk_args.session_timeout_ms = config().getInt("session-timeout", 10) * 1000; zk_args.operation_timeout_ms = config().getInt("operation-timeout", 10) * 1000; diff --git a/programs/keeper/Keeper.cpp b/programs/keeper/Keeper.cpp index bb04ff88936..f14ef2e5552 100644 --- a/programs/keeper/Keeper.cpp +++ b/programs/keeper/Keeper.cpp @@ -355,10 +355,7 @@ try std::string include_from_path = config().getString("include_from", "/etc/metrika.xml"); - if (config().has(DB::PlacementInfo::PLACEMENT_CONFIG_PREFIX)) - { - PlacementInfo::PlacementInfo::instance().initialize(config()); - } + PlacementInfo::PlacementInfo::instance().initialize(config()); GlobalThreadPool::initialize( /// We need to have sufficient amount of threads for connections + nuraft workers + keeper workers, 1000 is an estimation diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index e2554a6ff03..4cb3b5f45c7 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1003,6 +1003,8 @@ try ServerUUID::load(path / "uuid", log); + PlacementInfo::PlacementInfo::instance().initialize(config()); + zkutil::validateZooKeeperConfig(config()); bool has_zookeeper = zkutil::hasZooKeeperConfig(config()); @@ -1817,11 +1819,6 @@ try } - if (config().has(DB::PlacementInfo::PLACEMENT_CONFIG_PREFIX)) - { - PlacementInfo::PlacementInfo::instance().initialize(config()); - } - { std::lock_guard lock(servers_lock); /// We should start interserver communications before (and more important shutdown after) tables. diff --git a/src/Common/GetPriorityForLoadBalancing.cpp b/src/Common/GetPriorityForLoadBalancing.cpp index d4c6f89ff92..dc5704ef6b5 100644 --- a/src/Common/GetPriorityForLoadBalancing.cpp +++ b/src/Common/GetPriorityForLoadBalancing.cpp @@ -60,4 +60,26 @@ GetPriorityForLoadBalancing::getPriorityFunc(LoadBalancing load_balance, size_t return get_priority; } +/// Some load balancing strategies (such as "nearest hostname") have preferred nodes to connect to. +/// Usually it's a node in the same data center/availability zone. +/// For other strategies there's no difference between nodes. +bool GetPriorityForLoadBalancing::hasOptimalNode() const +{ + switch (load_balancing) + { + case LoadBalancing::NEAREST_HOSTNAME: + return true; + case LoadBalancing::HOSTNAME_LEVENSHTEIN_DISTANCE: + return true; + case LoadBalancing::IN_ORDER: + return false; + case LoadBalancing::RANDOM: + return false; + case LoadBalancing::FIRST_OR_RANDOM: + return true; + case LoadBalancing::ROUND_ROBIN: + return false; + } +} + } diff --git a/src/Common/GetPriorityForLoadBalancing.h b/src/Common/GetPriorityForLoadBalancing.h index 0de99730977..01dae9a1289 100644 --- a/src/Common/GetPriorityForLoadBalancing.h +++ b/src/Common/GetPriorityForLoadBalancing.h @@ -30,6 +30,8 @@ public: Func getPriorityFunc(LoadBalancing load_balance, size_t offset, size_t pool_size) const; + bool hasOptimalNode() const; + std::vector hostname_prefix_distance; /// Prefix distances from name of this host to the names of hosts of pools. std::vector hostname_levenshtein_distance; /// Levenshtein Distances from name of this host to the names of hosts of pools. diff --git a/src/Common/ZooKeeper/IKeeper.h b/src/Common/ZooKeeper/IKeeper.h index 7d574247aa5..2c6cbc4a5d5 100644 --- a/src/Common/ZooKeeper/IKeeper.h +++ b/src/Common/ZooKeeper/IKeeper.h @@ -559,6 +559,8 @@ public: /// Useful to check owner of ephemeral node. virtual int64_t getSessionID() const = 0; + virtual String tryGetAvailabilityZone() { return ""; } + /// If the method will throw an exception, callbacks won't be called. /// /// After the method is executed successfully, you must wait for callbacks @@ -635,10 +637,6 @@ public: virtual const DB::KeeperFeatureFlags * getKeeperFeatureFlags() const { return nullptr; } - /// A ZooKeeper session can have an optional deadline set on it. - /// After it has been reached, the session needs to be finalized. - virtual bool hasReachedDeadline() const = 0; - /// Expire session and finish all pending requests virtual void finalize(const String & reason) = 0; }; diff --git a/src/Common/ZooKeeper/TestKeeper.h b/src/Common/ZooKeeper/TestKeeper.h index 2774055652c..2194ad015bf 100644 --- a/src/Common/ZooKeeper/TestKeeper.h +++ b/src/Common/ZooKeeper/TestKeeper.h @@ -39,7 +39,6 @@ public: ~TestKeeper() override; bool isExpired() const override { return expired; } - bool hasReachedDeadline() const override { return false; } Int8 getConnectedNodeIdx() const override { return 0; } String getConnectedHostPort() const override { return "TestKeeper:0000"; } int32_t getConnectionXid() const override { return 0; } diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 4ec44a39136..56db9adb787 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -16,10 +17,12 @@ #include #include #include +#include #include "Common/ZooKeeper/IKeeper.h" #include #include #include +#include #include #include @@ -55,70 +58,120 @@ static void check(Coordination::Error code, const std::string & path) throw KeeperException::fromPath(code, path); } +UInt64 getSecondsUntilReconnect(const ZooKeeperArgs & args) +{ + std::uniform_int_distribution fallback_session_lifetime_distribution + { + args.fallback_session_lifetime.min_sec, + args.fallback_session_lifetime.max_sec, + }; + UInt32 session_lifetime_seconds = fallback_session_lifetime_distribution(thread_local_rng); + return session_lifetime_seconds; +} -void ZooKeeper::init(ZooKeeperArgs args_) +void ZooKeeper::updateAvailabilityZones() +{ + ShuffleHosts shuffled_hosts = shuffleHosts(); + + for (const auto & node : shuffled_hosts) + { + try + { + ShuffleHosts single_node{node}; + auto tmp_impl = std::make_unique(single_node, args, zk_log); + auto idx = node.original_index; + availability_zones[idx] = tmp_impl->tryGetAvailabilityZone(); + LOG_TEST(log, "Got availability zone for {}: {}", args.hosts[idx], availability_zones[idx]); + } + catch (...) + { + DB::tryLogCurrentException(log, "Failed to get availability zone for " + node.host); + } + } + LOG_DEBUG(log, "Updated availability zones: [{}]", fmt::join(availability_zones, ", ")); +} + +void ZooKeeper::init(ZooKeeperArgs args_, std::unique_ptr existing_impl) { args = std::move(args_); log = getLogger("ZooKeeper"); - if (args.implementation == "zookeeper") + if (existing_impl) + { + chassert(args.implementation == "zookeeper"); + impl = std::move(existing_impl); + LOG_INFO(log, "Switching to connection to a more optimal node {}", impl->getConnectedHostPort()); + } + else if (args.implementation == "zookeeper") { if (args.hosts.empty()) throw KeeperException::fromMessage(Coordination::Error::ZBADARGUMENTS, "No hosts passed to ZooKeeper constructor."); - Coordination::ZooKeeper::Nodes nodes; - nodes.reserve(args.hosts.size()); + chassert(args.availability_zones.size() == args.hosts.size()); + if (availability_zones.empty()) + { + /// availability_zones is empty on server startup or after config reloading + /// We will keep the az info when starting new sessions + availability_zones = args.availability_zones; + LOG_TEST(log, "Availability zones from config: [{}], client: {}", fmt::join(availability_zones, ", "), args.client_availability_zone); + if (args.availability_zone_autodetect) + updateAvailabilityZones(); + } + chassert(availability_zones.size() == args.hosts.size()); /// Shuffle the hosts to distribute the load among ZooKeeper nodes. - std::vector shuffled_hosts = shuffleHosts(); + ShuffleHosts shuffled_hosts = shuffleHosts(); - bool dns_error = false; - for (auto & host : shuffled_hosts) - { - auto & host_string = host.host; - try - { - const bool secure = startsWith(host_string, "secure://"); - - if (secure) - host_string.erase(0, strlen("secure://")); - - /// We want to resolve all hosts without DNS cache for keeper connection. - Coordination::DNSResolver::instance().removeHostFromCache(host_string); - - const Poco::Net::SocketAddress host_socket_addr{host_string}; - LOG_TEST(log, "Adding ZooKeeper host {} ({})", host_string, host_socket_addr.toString()); - nodes.emplace_back(Coordination::ZooKeeper::Node{host_socket_addr, host.original_index, secure}); - } - catch (const Poco::Net::HostNotFoundException & e) - { - /// Most likely it's misconfiguration and wrong hostname was specified - LOG_ERROR(log, "Cannot use ZooKeeper host {}, reason: {}", host_string, e.displayText()); - } - catch (const Poco::Net::DNSException & e) - { - /// Most likely DNS is not available now - dns_error = true; - LOG_ERROR(log, "Cannot use ZooKeeper host {} due to DNS error: {}", host_string, e.displayText()); - } - } - - if (nodes.empty()) - { - /// For DNS errors we throw exception with ZCONNECTIONLOSS code, so it will be considered as hardware error, not user error - if (dns_error) - throw KeeperException::fromMessage(Coordination::Error::ZCONNECTIONLOSS, "Cannot resolve any of provided ZooKeeper hosts due to DNS error"); - else - throw KeeperException::fromMessage(Coordination::Error::ZCONNECTIONLOSS, "Cannot use any of provided ZooKeeper nodes"); - } - - impl = std::make_unique(nodes, args, zk_log); + impl = std::make_unique(shuffled_hosts, args, zk_log); + Int8 node_idx = impl->getConnectedNodeIdx(); if (args.chroot.empty()) LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(args.hosts, ",")); else LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), args.chroot); + + + /// If the balancing strategy has an optimal node then it will be the first in the list + bool connected_to_suboptimal_node = node_idx != shuffled_hosts[0].original_index; + bool respect_az = args.prefer_local_availability_zone && !args.client_availability_zone.empty(); + bool may_benefit_from_reconnecting = respect_az || args.get_priority_load_balancing.hasOptimalNode(); + if (connected_to_suboptimal_node && may_benefit_from_reconnecting) + { + auto reconnect_timeout_sec = getSecondsUntilReconnect(args); + LOG_DEBUG(log, "Connected to a suboptimal ZooKeeper host ({}, index {})." + " To preserve balance in ZooKeeper usage, this ZooKeeper session will expire in {} seconds", + impl->getConnectedHostPort(), node_idx, reconnect_timeout_sec); + + auto reconnect_task_holder = DB::Context::getGlobalContextInstance()->getSchedulePool().createTask("ZKReconnect", [this, optimal_host = shuffled_hosts[0]]() + { + try + { + LOG_DEBUG(log, "Trying to connect to a more optimal node {}", optimal_host.host); + ShuffleHosts node{optimal_host}; + std::unique_ptr new_impl = std::make_unique(node, args, zk_log); + Int8 new_node_idx = new_impl->getConnectedNodeIdx(); + + /// Maybe the node was unavailable when getting AZs first time, update just in case + if (args.availability_zone_autodetect && availability_zones[new_node_idx].empty()) + { + availability_zones[new_node_idx] = new_impl->tryGetAvailabilityZone(); + LOG_DEBUG(log, "Got availability zone for {}: {}", optimal_host.host, availability_zones[new_node_idx]); + } + + optimal_impl = std::move(new_impl); + impl->finalize("Connected to a more optimal node"); + } + catch (...) + { + LOG_WARNING(log, "Failed to connect to a more optimal ZooKeeper, will try again later: {}", DB::getCurrentExceptionMessage(/*with_stacktrace*/ false)); + (*reconnect_task)->scheduleAfter(getSecondsUntilReconnect(args) * 1000); + } + }); + reconnect_task = std::make_unique(std::move(reconnect_task_holder)); + (*reconnect_task)->activate(); + (*reconnect_task)->scheduleAfter(reconnect_timeout_sec * 1000); + } } else if (args.implementation == "testkeeper") { @@ -152,29 +205,53 @@ void ZooKeeper::init(ZooKeeperArgs args_) } } +ZooKeeper::~ZooKeeper() +{ + if (reconnect_task) + (*reconnect_task)->deactivate(); +} ZooKeeper::ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr zk_log_) : zk_log(std::move(zk_log_)) { - init(args_); + init(args_, /*existing_impl*/ {}); +} + + +ZooKeeper::ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr zk_log_, Strings availability_zones_, std::unique_ptr existing_impl) + : availability_zones(std::move(availability_zones_)), zk_log(std::move(zk_log_)) +{ + if (availability_zones.size() != args_.hosts.size()) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Argument sizes mismatch: availability_zones count {} and hosts count {}", + availability_zones.size(), args_.hosts.size()); + init(args_, std::move(existing_impl)); } ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr zk_log_) : zk_log(std::move(zk_log_)) { - init(ZooKeeperArgs(config, config_name)); + init(ZooKeeperArgs(config, config_name), /*existing_impl*/ {}); } -std::vector ZooKeeper::shuffleHosts() const +ShuffleHosts ZooKeeper::shuffleHosts() const { - std::function get_priority = args.get_priority_load_balancing.getPriorityFunc(args.get_priority_load_balancing.load_balancing, 0, args.hosts.size()); - std::vector shuffle_hosts; + std::function get_priority = args.get_priority_load_balancing.getPriorityFunc( + args.get_priority_load_balancing.load_balancing, /* offset for first_or_random */ 0, args.hosts.size()); + ShuffleHosts shuffle_hosts; for (size_t i = 0; i < args.hosts.size(); ++i) { ShuffleHost shuffle_host; shuffle_host.host = args.hosts[i]; shuffle_host.original_index = static_cast(i); + + shuffle_host.secure = startsWith(shuffle_host.host, "secure://"); + if (shuffle_host.secure) + shuffle_host.host.erase(0, strlen("secure://")); + + if (!args.client_availability_zone.empty() && !availability_zones[i].empty()) + shuffle_host.az_info = availability_zones[i] == args.client_availability_zone ? ShuffleHost::SAME : ShuffleHost::OTHER; + if (get_priority) shuffle_host.priority = get_priority(i); shuffle_host.randomize(); @@ -1023,7 +1100,10 @@ ZooKeeperPtr ZooKeeper::create(const Poco::Util::AbstractConfiguration & config, ZooKeeperPtr ZooKeeper::startNewSession() const { - auto res = std::shared_ptr(new ZooKeeper(args, zk_log)); + if (reconnect_task) + (*reconnect_task)->deactivate(); + + auto res = std::shared_ptr(new ZooKeeper(args, zk_log, availability_zones, std::move(optimal_impl))); res->initSession(); return res; } @@ -1456,6 +1536,16 @@ int32_t ZooKeeper::getConnectionXid() const return impl->getConnectionXid(); } +String ZooKeeper::getConnectedHostAvailabilityZone() const +{ + if (args.implementation != "zookeeper" || !impl) + return ""; + Int8 idx = impl->getConnectedNodeIdx(); + if (idx < 0) + return ""; /// session expired + return availability_zones.at(idx); +} + size_t getFailedOpIndex(Coordination::Error exception_code, const Coordination::Responses & responses) { if (responses.empty()) diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index 08ff60a80cf..4ae2cfa6096 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -32,6 +32,7 @@ namespace DB { class ZooKeeperLog; class ZooKeeperWithFaultInjection; +class BackgroundSchedulePoolTaskHolder; namespace ErrorCodes { @@ -48,11 +49,23 @@ constexpr size_t MULTI_BATCH_SIZE = 100; struct ShuffleHost { + enum AvailabilityZoneInfo + { + SAME = 0, + UNKNOWN = 1, + OTHER = 2, + }; + String host; + bool secure = false; UInt8 original_index = 0; + AvailabilityZoneInfo az_info = UNKNOWN; Priority priority; UInt64 random = 0; + /// We should resolve it each time without caching + mutable std::optional address; + void randomize() { random = thread_local_rng(); @@ -60,11 +73,13 @@ struct ShuffleHost static bool compare(const ShuffleHost & lhs, const ShuffleHost & rhs) { - return std::forward_as_tuple(lhs.priority, lhs.random) - < std::forward_as_tuple(rhs.priority, rhs.random); + return std::forward_as_tuple(lhs.az_info, lhs.priority, lhs.random) + < std::forward_as_tuple(rhs.az_info, rhs.priority, rhs.random); } }; +using ShuffleHosts = std::vector; + struct RemoveException { explicit RemoveException(std::string_view path_ = "", bool remove_subtree_ = true) @@ -197,6 +212,9 @@ class ZooKeeper explicit ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr zk_log_ = nullptr); + /// Allows to keep info about availability zones when starting a new session + ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr zk_log_, Strings availability_zones_, std::unique_ptr existing_impl); + /** Config of the form: @@ -228,7 +246,9 @@ public: using Ptr = std::shared_ptr; using ErrorsList = std::initializer_list; - std::vector shuffleHosts() const; + ~ZooKeeper(); + + ShuffleHosts shuffleHosts() const; static Ptr create(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, @@ -596,8 +616,6 @@ public: UInt32 getSessionUptime() const { return static_cast(session_uptime.elapsedSeconds()); } - bool hasReachedDeadline() const { return impl->hasReachedDeadline(); } - uint64_t getSessionTimeoutMS() const { return args.session_timeout_ms; } void setServerCompletelyStarted(); @@ -606,6 +624,8 @@ public: String getConnectedHostPort() const; int32_t getConnectionXid() const; + String getConnectedHostAvailabilityZone() const; + const DB::KeeperFeatureFlags * getKeeperFeatureFlags() const { return impl->getKeeperFeatureFlags(); } /// Checks that our session was not killed, and allows to avoid applying a request from an old lost session. @@ -625,7 +645,8 @@ public: void addCheckSessionOp(Coordination::Requests & requests) const; private: - void init(ZooKeeperArgs args_); + void init(ZooKeeperArgs args_, std::unique_ptr existing_impl); + void updateAvailabilityZones(); /// The following methods don't any throw exceptions but return error codes. Coordination::Error createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created); @@ -690,15 +711,20 @@ private: } std::unique_ptr impl; + mutable std::unique_ptr optimal_impl; ZooKeeperArgs args; + Strings availability_zones; + LoggerPtr log = nullptr; std::shared_ptr zk_log; AtomicStopwatch session_uptime; int32_t session_node_version; + + std::unique_ptr reconnect_task; }; diff --git a/src/Common/ZooKeeper/ZooKeeperArgs.cpp b/src/Common/ZooKeeper/ZooKeeperArgs.cpp index a581b6a7f38..18dff779a70 100644 --- a/src/Common/ZooKeeper/ZooKeeperArgs.cpp +++ b/src/Common/ZooKeeper/ZooKeeperArgs.cpp @@ -5,6 +5,9 @@ #include #include #include +#include +#include +#include #include namespace DB @@ -53,6 +56,7 @@ ZooKeeperArgs::ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, c ZooKeeperArgs::ZooKeeperArgs(const String & hosts_string) { splitInto<','>(hosts, hosts_string); + availability_zones.resize(hosts.size()); } void ZooKeeperArgs::initFromKeeperServerSection(const Poco::Util::AbstractConfiguration & config) @@ -103,8 +107,11 @@ void ZooKeeperArgs::initFromKeeperServerSection(const Poco::Util::AbstractConfig for (const auto & key : keys) { if (startsWith(key, "server")) + { hosts.push_back( (secure ? "secure://" : "") + config.getString(raft_configuration_key + "." + key + ".hostname") + ":" + tcp_port); + availability_zones.push_back(config.getString(raft_configuration_key + "." + key + ".availability_zone", "")); + } } static constexpr std::array load_balancing_keys @@ -123,11 +130,15 @@ void ZooKeeperArgs::initFromKeeperServerSection(const Poco::Util::AbstractConfig auto load_balancing = magic_enum::enum_cast(Poco::toUpper(load_balancing_str)); if (!load_balancing) throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknown load balancing: {}", load_balancing_str); - get_priority_load_balancing.load_balancing = *load_balancing; + get_priority_load_balancing = DB::GetPriorityForLoadBalancing(*load_balancing, thread_local_rng() % hosts.size()); break; } } + availability_zone_autodetect = config.getBool(std::string{config_name} + ".availability_zone_autodetect", false); + prefer_local_availability_zone = config.getBool(std::string{config_name} + ".prefer_local_availability_zone", false); + if (prefer_local_availability_zone) + client_availability_zone = DB::PlacementInfo::PlacementInfo::instance().getAvailabilityZone(); } void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) @@ -137,6 +148,8 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio Poco::Util::AbstractConfiguration::Keys keys; config.keys(config_name, keys); + std::optional load_balancing; + for (const auto & key : keys) { if (key.starts_with("node")) @@ -144,6 +157,7 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio hosts.push_back( (config.getBool(config_name + "." + key + ".secure", false) ? "secure://" : "") + config.getString(config_name + "." + key + ".host") + ":" + config.getString(config_name + "." + key + ".port", "2181")); + availability_zones.push_back(config.getString(config_name + "." + key + ".availability_zone", "")); } else if (key == "session_timeout_ms") { @@ -199,6 +213,10 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio { sessions_path = config.getString(config_name + "." + key); } + else if (key == "prefer_local_availability_zone") + { + prefer_local_availability_zone = config.getBool(config_name + "." + key); + } else if (key == "implementation") { implementation = config.getString(config_name + "." + key); @@ -207,10 +225,9 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio { String load_balancing_str = config.getString(config_name + "." + key); /// Use magic_enum to avoid dependency from dbms (`SettingFieldLoadBalancingTraits::fromString(...)`) - auto load_balancing = magic_enum::enum_cast(Poco::toUpper(load_balancing_str)); + load_balancing = magic_enum::enum_cast(Poco::toUpper(load_balancing_str)); if (!load_balancing) throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknown load balancing: {}", load_balancing_str); - get_priority_load_balancing.load_balancing = *load_balancing; } else if (key == "fallback_session_lifetime") { @@ -224,9 +241,19 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio { use_compression = config.getBool(config_name + "." + key); } + else if (key == "availability_zone_autodetect") + { + availability_zone_autodetect = config.getBool(config_name + "." + key); + } else throw KeeperException(Coordination::Error::ZBADARGUMENTS, "Unknown key {} in config file", key); } + + if (load_balancing) + get_priority_load_balancing = DB::GetPriorityForLoadBalancing(*load_balancing, thread_local_rng() % hosts.size()); + + if (prefer_local_availability_zone) + client_availability_zone = DB::PlacementInfo::PlacementInfo::instance().getAvailabilityZone(); } } diff --git a/src/Common/ZooKeeper/ZooKeeperArgs.h b/src/Common/ZooKeeper/ZooKeeperArgs.h index 27ba173c0c3..945b77bf9c1 100644 --- a/src/Common/ZooKeeper/ZooKeeperArgs.h +++ b/src/Common/ZooKeeper/ZooKeeperArgs.h @@ -32,10 +32,12 @@ struct ZooKeeperArgs String zookeeper_name = "zookeeper"; String implementation = "zookeeper"; Strings hosts; + Strings availability_zones; String auth_scheme; String identity; String chroot; String sessions_path = "/clickhouse/sessions"; + String client_availability_zone; int32_t connection_timeout_ms = Coordination::DEFAULT_CONNECTION_TIMEOUT_MS; int32_t session_timeout_ms = Coordination::DEFAULT_SESSION_TIMEOUT_MS; int32_t operation_timeout_ms = Coordination::DEFAULT_OPERATION_TIMEOUT_MS; @@ -47,6 +49,8 @@ struct ZooKeeperArgs UInt64 send_sleep_ms = 0; UInt64 recv_sleep_ms = 0; bool use_compression = false; + bool prefer_local_availability_zone = false; + bool availability_zone_autodetect = false; SessionLifetimeConfiguration fallback_session_lifetime = {}; DB::GetPriorityForLoadBalancing get_priority_load_balancing; diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index ed7498b1ac9..8653af51308 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -23,6 +23,9 @@ #include #include +#include +#include + #include "Coordination/KeeperConstants.h" #include "config.h" @@ -338,7 +341,7 @@ ZooKeeper::~ZooKeeper() ZooKeeper::ZooKeeper( - const Nodes & nodes, + const zkutil::ShuffleHosts & nodes, const zkutil::ZooKeeperArgs & args_, std::shared_ptr zk_log_) : args(args_) @@ -426,7 +429,7 @@ ZooKeeper::ZooKeeper( void ZooKeeper::connect( - const Nodes & nodes, + const zkutil::ShuffleHosts & nodes, Poco::Timespan connection_timeout) { if (nodes.empty()) @@ -434,15 +437,51 @@ void ZooKeeper::connect( static constexpr size_t num_tries = 3; bool connected = false; + bool dns_error = false; + + size_t resolved_count = 0; + for (const auto & node : nodes) + { + try + { + const Poco::Net::SocketAddress host_socket_addr{node.host}; + LOG_TRACE(log, "Adding ZooKeeper host {} ({}), az: {}, priority: {}", node.host, host_socket_addr.toString(), node.az_info, node.priority); + node.address = host_socket_addr; + ++resolved_count; + } + catch (const Poco::Net::HostNotFoundException & e) + { + /// Most likely it's misconfiguration and wrong hostname was specified + LOG_ERROR(log, "Cannot use ZooKeeper host {}, reason: {}", node.host, e.displayText()); + } + catch (const Poco::Net::DNSException & e) + { + /// Most likely DNS is not available now + dns_error = true; + LOG_ERROR(log, "Cannot use ZooKeeper host {} due to DNS error: {}", node.host, e.displayText()); + } + } + + if (resolved_count == 0) + { + /// For DNS errors we throw exception with ZCONNECTIONLOSS code, so it will be considered as hardware error, not user error + if (dns_error) + throw zkutil::KeeperException::fromMessage( + Coordination::Error::ZCONNECTIONLOSS, "Cannot resolve any of provided ZooKeeper hosts due to DNS error"); + else + throw zkutil::KeeperException::fromMessage(Coordination::Error::ZCONNECTIONLOSS, "Cannot use any of provided ZooKeeper nodes"); + } WriteBufferFromOwnString fail_reasons; for (size_t try_no = 0; try_no < num_tries; ++try_no) { - for (size_t i = 0; i < nodes.size(); ++i) + for (const auto & node : nodes) { - const auto & node = nodes[i]; try { + if (!node.address) + continue; + /// Reset the state of previous attempt. if (node.secure) { @@ -458,7 +497,7 @@ void ZooKeeper::connect( socket = Poco::Net::StreamSocket(); } - socket.connect(node.address, connection_timeout); + socket.connect(*node.address, connection_timeout); socket_address = socket.peerAddress(); socket.setReceiveTimeout(args.operation_timeout_ms * 1000); @@ -498,27 +537,11 @@ void ZooKeeper::connect( } original_index = static_cast(node.original_index); - - if (i != 0) - { - std::uniform_int_distribution fallback_session_lifetime_distribution - { - args.fallback_session_lifetime.min_sec, - args.fallback_session_lifetime.max_sec, - }; - UInt32 session_lifetime_seconds = fallback_session_lifetime_distribution(thread_local_rng); - client_session_deadline = clock::now() + std::chrono::seconds(session_lifetime_seconds); - - LOG_DEBUG(log, "Connected to a suboptimal ZooKeeper host ({}, index {})." - " To preserve balance in ZooKeeper usage, this ZooKeeper session will expire in {} seconds", - node.address.toString(), i, session_lifetime_seconds); - } - break; } catch (...) { - fail_reasons << "\n" << getCurrentExceptionMessage(false) << ", " << node.address.toString(); + fail_reasons << "\n" << getCurrentExceptionMessage(false) << ", " << node.address->toString(); } } @@ -532,6 +555,9 @@ void ZooKeeper::connect( bool first = true; for (const auto & node : nodes) { + if (!node.address) + continue; + if (first) first = false; else @@ -540,7 +566,7 @@ void ZooKeeper::connect( if (node.secure) message << "secure://"; - message << node.address.toString(); + message << node.address->toString(); } message << fail_reasons.str() << "\n"; @@ -1153,7 +1179,6 @@ void ZooKeeper::pushRequest(RequestInfo && info) { try { - checkSessionDeadline(); info.time = clock::now(); auto maybe_zk_log = std::atomic_load(&zk_log); if (maybe_zk_log) @@ -1201,44 +1226,44 @@ bool ZooKeeper::isFeatureEnabled(KeeperFeatureFlag feature_flag) const return keeper_feature_flags.isEnabled(feature_flag); } -void ZooKeeper::initFeatureFlags() +std::optional ZooKeeper::tryGetSystemZnode(const std::string & path, const std::string & description) { - const auto try_get = [&](const std::string & path, const std::string & description) -> std::optional + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + auto callback = [promise](const Coordination::GetResponse & response) mutable { - auto promise = std::make_shared>(); - auto future = promise->get_future(); - - auto callback = [promise](const Coordination::GetResponse & response) mutable - { - promise->set_value(response); - }; - - get(path, std::move(callback), {}); - if (future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready) - throw Exception(Error::ZOPERATIONTIMEOUT, "Failed to get {}: timeout", description); - - auto response = future.get(); - - if (response.error == Coordination::Error::ZNONODE) - { - LOG_TRACE(log, "Failed to get {}", description); - return std::nullopt; - } - else if (response.error != Coordination::Error::ZOK) - { - throw Exception(response.error, "Failed to get {}", description); - } - - return std::move(response.data); + promise->set_value(response); }; - if (auto feature_flags = try_get(keeper_api_feature_flags_path, "feature flags"); feature_flags.has_value()) + get(path, std::move(callback), {}); + if (future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready) + throw Exception(Error::ZOPERATIONTIMEOUT, "Failed to get {}: timeout", description); + + auto response = future.get(); + + if (response.error == Coordination::Error::ZNONODE) + { + LOG_TRACE(log, "Failed to get {}", description); + return std::nullopt; + } + else if (response.error != Coordination::Error::ZOK) + { + throw Exception(response.error, "Failed to get {}", description); + } + + return std::move(response.data); +} + +void ZooKeeper::initFeatureFlags() +{ + if (auto feature_flags = tryGetSystemZnode(keeper_api_feature_flags_path, "feature flags"); feature_flags.has_value()) { keeper_feature_flags.setFeatureFlags(std::move(*feature_flags)); return; } - auto keeper_api_version_string = try_get(keeper_api_version_path, "API version"); + auto keeper_api_version_string = tryGetSystemZnode(keeper_api_version_path, "API version"); DB::KeeperApiVersion keeper_api_version{DB::KeeperApiVersion::ZOOKEEPER_COMPATIBLE}; @@ -1256,6 +1281,17 @@ void ZooKeeper::initFeatureFlags() keeper_feature_flags.fromApiVersion(keeper_api_version); } +String ZooKeeper::tryGetAvailabilityZone() +{ + auto res = tryGetSystemZnode(keeper_availability_zone_path, "availability zone"); + if (res) + { + LOG_TRACE(log, "Availability zone for ZooKeeper at {}: {}", getConnectedHostPort(), *res); + return *res; + } + return ""; +} + void ZooKeeper::executeGenericRequest( const ZooKeeperRequestPtr & request, @@ -1587,17 +1623,6 @@ void ZooKeeper::setupFaultDistributions() inject_setup.test_and_set(); } -void ZooKeeper::checkSessionDeadline() const -{ - if (unlikely(hasReachedDeadline())) - throw Exception::fromMessage(Error::ZSESSIONEXPIRED, "Session expired (force expiry client-side)"); -} - -bool ZooKeeper::hasReachedDeadline() const -{ - return client_session_deadline.has_value() && clock::now() >= client_session_deadline.value(); -} - void ZooKeeper::maybeInjectSendFault() { if (unlikely(inject_setup.test() && send_inject_fault && send_inject_fault.value()(thread_local_rng))) diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.h b/src/Common/ZooKeeper/ZooKeeperImpl.h index 8fdf0f97d9d..0c88c35b381 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -102,21 +103,12 @@ using namespace DB; class ZooKeeper final : public IKeeper { public: - struct Node - { - Poco::Net::SocketAddress address; - UInt8 original_index; - bool secure; - }; - - using Nodes = std::vector; - /** Connection to nodes is performed in order. If you want, shuffle them manually. * Operation timeout couldn't be greater than session timeout. * Operation timeout applies independently for network read, network write, waiting for events and synchronization. */ ZooKeeper( - const Nodes & nodes, + const zkutil::ShuffleHosts & nodes, const zkutil::ZooKeeperArgs & args_, std::shared_ptr zk_log_); @@ -130,9 +122,7 @@ public: String getConnectedHostPort() const override { return (original_index == -1) ? "" : args.hosts[original_index]; } int32_t getConnectionXid() const override { return next_xid.load(); } - /// A ZooKeeper session can have an optional deadline set on it. - /// After it has been reached, the session needs to be finalized. - bool hasReachedDeadline() const override; + String tryGetAvailabilityZone() override; /// Useful to check owner of ephemeral node. int64_t getSessionID() const override { return session_id; } @@ -271,7 +261,6 @@ private: clock::time_point time; }; - std::optional client_session_deadline {}; using RequestsQueue = ConcurrentBoundedQueue; RequestsQueue requests_queue{1024}; @@ -316,7 +305,7 @@ private: LoggerPtr log; void connect( - const Nodes & node, + const zkutil::ShuffleHosts & node, Poco::Timespan connection_timeout); void sendHandshake(); @@ -346,9 +335,10 @@ private: void logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response = nullptr, bool finalize = false, UInt64 elapsed_microseconds = 0); + std::optional tryGetSystemZnode(const std::string & path, const std::string & description); + void initFeatureFlags(); - void checkSessionDeadline() const; CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession}; std::shared_ptr zk_log; diff --git a/src/Common/ZooKeeper/examples/zkutil_test_commands_new_lib.cpp b/src/Common/ZooKeeper/examples/zkutil_test_commands_new_lib.cpp index 25d66b94b46..b3a1564b8ab 100644 --- a/src/Common/ZooKeeper/examples/zkutil_test_commands_new_lib.cpp +++ b/src/Common/ZooKeeper/examples/zkutil_test_commands_new_lib.cpp @@ -25,24 +25,24 @@ try Poco::Logger::root().setChannel(channel); Poco::Logger::root().setLevel("trace"); - std::string hosts_arg = argv[1]; - std::vector hosts_strings; - splitInto<','>(hosts_strings, hosts_arg); - ZooKeeper::Nodes nodes; - nodes.reserve(hosts_strings.size()); - for (size_t i = 0; i < hosts_strings.size(); ++i) + zkutil::ZooKeeperArgs args{argv[1]}; + zkutil::ShuffleHosts nodes; + nodes.reserve(args.hosts.size()); + for (size_t i = 0; i < args.hosts.size(); ++i) { - std::string host_string = hosts_strings[i]; - bool secure = startsWith(host_string, "secure://"); + zkutil::ShuffleHost node; + std::string host_string = args.hosts[i]; + node.secure = startsWith(host_string, "secure://"); - if (secure) + if (node.secure) host_string.erase(0, strlen("secure://")); - nodes.emplace_back(ZooKeeper::Node{Poco::Net::SocketAddress{host_string}, static_cast(i) , secure}); + node.host = host_string; + node.original_index = i; + + nodes.emplace_back(node); } - - zkutil::ZooKeeperArgs args; ZooKeeper zk(nodes, args, nullptr); Poco::Event event(true); diff --git a/src/IO/S3/Credentials.cpp b/src/IO/S3/Credentials.cpp index fa9d018eaa6..dfb7727fca4 100644 --- a/src/IO/S3/Credentials.cpp +++ b/src/IO/S3/Credentials.cpp @@ -9,6 +9,21 @@ namespace ErrorCodes extern const int UNSUPPORTED_METHOD; } +namespace S3 +{ + std::string tryGetRunningAvailabilityZone() + { + try + { + return getRunningAvailabilityZone(); + } + catch (...) + { + tryLogCurrentException("tryGetRunningAvailabilityZone"); + return ""; + } + } +} } #if USE_AWS_S3 diff --git a/src/IO/S3/Credentials.h b/src/IO/S3/Credentials.h index b8698d9b302..95297ab0538 100644 --- a/src/IO/S3/Credentials.h +++ b/src/IO/S3/Credentials.h @@ -24,6 +24,7 @@ static inline constexpr char GCP_METADATA_SERVICE_ENDPOINT[] = "http://metadata. /// getRunningAvailabilityZone returns the availability zone of the underlying compute resources where the current process runs. std::string getRunningAvailabilityZone(); +std::string tryGetRunningAvailabilityZone(); class AWSEC2MetadataClient : public Aws::Internal::AWSHttpResourceClient { @@ -195,6 +196,7 @@ namespace DB namespace S3 { std::string getRunningAvailabilityZone(); +std::string tryGetRunningAvailabilityZone(); } } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 82e9afb1d36..b946c2cb21e 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -3402,8 +3402,6 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const const auto & config = shared->zookeeper_config ? *shared->zookeeper_config : getConfigRef(); if (!shared->zookeeper) shared->zookeeper = zkutil::ZooKeeper::create(config, zkutil::getZooKeeperConfigName(config), getZooKeeperLog()); - else if (shared->zookeeper->hasReachedDeadline()) - shared->zookeeper->finalize("ZooKeeper session has reached its deadline"); if (shared->zookeeper->expired()) { diff --git a/src/Server/CloudPlacementInfo.cpp b/src/Server/CloudPlacementInfo.cpp index 0790f825a45..d8810bb30de 100644 --- a/src/Server/CloudPlacementInfo.cpp +++ b/src/Server/CloudPlacementInfo.cpp @@ -11,6 +11,11 @@ namespace DB { +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +} + namespace PlacementInfo { @@ -46,7 +51,15 @@ PlacementInfo & PlacementInfo::instance() } void PlacementInfo::initialize(const Poco::Util::AbstractConfiguration & config) +try { + if (!config.has(DB::PlacementInfo::PLACEMENT_CONFIG_PREFIX)) + { + availability_zone = ""; + initialized = true; + return; + } + use_imds = config.getBool(getConfigPath("use_imds"), false); if (use_imds) @@ -67,14 +80,17 @@ void PlacementInfo::initialize(const Poco::Util::AbstractConfiguration & config) LOG_DEBUG(log, "Loaded info: availability_zone: {}", availability_zone); initialized = true; } +catch (...) +{ + tryLogCurrentException("Failed to get availability zone"); + availability_zone = ""; + initialized = true; +} std::string PlacementInfo::getAvailabilityZone() const { if (!initialized) - { - LOG_WARNING(log, "Placement info has not been loaded"); - return ""; - } + throw Exception(ErrorCodes::LOGICAL_ERROR, "Placement info has not been loaded"); return availability_zone; } diff --git a/src/Storages/System/StorageSystemZooKeeperConnection.cpp b/src/Storages/System/StorageSystemZooKeeperConnection.cpp index 950e20512c0..ec29b84dac3 100644 --- a/src/Storages/System/StorageSystemZooKeeperConnection.cpp +++ b/src/Storages/System/StorageSystemZooKeeperConnection.cpp @@ -36,7 +36,8 @@ ColumnsDescription StorageSystemZooKeeperConnection::getColumnsDescription() /* 9 */ {"xid", std::make_shared(), "XID of the current session."}, /* 10*/ {"enabled_feature_flags", std::make_shared(std::move(feature_flags_enum)), "Feature flags which are enabled. Only applicable to ClickHouse Keeper." - } + }, + /* 11*/ {"availability_zone", std::make_shared(), "Availability zone"}, }; } @@ -85,6 +86,7 @@ void StorageSystemZooKeeperConnection::fillData(MutableColumns & res_columns, Co columns[8]->insert(zookeeper->getClientID()); columns[9]->insert(zookeeper->getConnectionXid()); add_enabled_feature_flags(zookeeper); + columns[11]->insert(zookeeper->getConnectedHostAvailabilityZone()); } }; diff --git a/tests/integration/test_zookeeper_config_load_balancing/configs/zookeeper_load_balancing2.xml b/tests/integration/test_zookeeper_config_load_balancing/configs/zookeeper_load_balancing2.xml new file mode 100644 index 00000000000..fd416cad505 --- /dev/null +++ b/tests/integration/test_zookeeper_config_load_balancing/configs/zookeeper_load_balancing2.xml @@ -0,0 +1,35 @@ + + + + random + + 1 + + + 0 + 1 + + + + zoo1 + 2181 + az1 + + + zoo2 + 2181 + az2 + + + zoo3 + 2181 + az3 + + 3000 + + + + 0 + az2 + + diff --git a/tests/integration/test_zookeeper_config_load_balancing/test.py b/tests/integration/test_zookeeper_config_load_balancing/test.py index f17e0c3f03f..9cdf7db2b08 100644 --- a/tests/integration/test_zookeeper_config_load_balancing/test.py +++ b/tests/integration/test_zookeeper_config_load_balancing/test.py @@ -1,6 +1,8 @@ +import time import pytest from helpers.cluster import ClickHouseCluster from helpers.network import PartitionManager +from helpers.test_tools import assert_eq_with_retry cluster = ClickHouseCluster( __file__, zookeeper_config_path="configs/zookeeper_load_balancing.xml" @@ -17,6 +19,10 @@ node3 = cluster.add_instance( "nod3", with_zookeeper=True, main_configs=["configs/zookeeper_load_balancing.xml"] ) +node4 = cluster.add_instance( + "nod4", with_zookeeper=True, main_configs=["configs/zookeeper_load_balancing2.xml"] +) + def change_balancing(old, new, reload=True): line = "{}<" @@ -405,113 +411,57 @@ def test_hostname_levenshtein_distance(started_cluster): def test_round_robin(started_cluster): pm = PartitionManager() try: - pm._add_rule( - { - "source": node1.ip_address, - "destination": cluster.get_instance_ip("zoo1"), - "action": "REJECT --reject-with tcp-reset", - } - ) - pm._add_rule( - { - "source": node2.ip_address, - "destination": cluster.get_instance_ip("zoo1"), - "action": "REJECT --reject-with tcp-reset", - } - ) - pm._add_rule( - { - "source": node3.ip_address, - "destination": cluster.get_instance_ip("zoo1"), - "action": "REJECT --reject-with tcp-reset", - } - ) change_balancing("random", "round_robin") - - print( - str( - node1.exec_in_container( - [ - "bash", - "-c", - "lsof -a -i4 -i6 -itcp -w | grep ':2181' | grep ESTABLISHED", - ], - privileged=True, - user="root", - ) + for node in [node1, node2, node3]: + idx = int( + node.query("select index from system.zookeeper_connection").strip() ) - ) - assert ( - "1" - == str( - node1.exec_in_container( - [ - "bash", - "-c", - "lsof -a -i4 -i6 -itcp -w | grep 'testzookeeperconfigloadbalancing_zoo2_1.*testzookeeperconfigloadbalancing_default:2181' | grep ESTABLISHED | wc -l", - ], - privileged=True, - user="root", - ) - ).strip() - ) + new_idx = (idx + 1) % 3 - print( - str( - node2.exec_in_container( - [ - "bash", - "-c", - "lsof -a -i4 -i6 -itcp -w | grep ':2181' | grep ESTABLISHED", - ], - privileged=True, - user="root", - ) + pm._add_rule( + { + "source": node.ip_address, + "destination": cluster.get_instance_ip("zoo" + str(idx + 1)), + "action": "REJECT --reject-with tcp-reset", + } ) - ) - assert ( - "1" - == str( - node2.exec_in_container( - [ - "bash", - "-c", - "lsof -a -i4 -i6 -itcp -w | grep 'testzookeeperconfigloadbalancing_zoo2_1.*testzookeeperconfigloadbalancing_default:2181' | grep ESTABLISHED | wc -l", - ], - privileged=True, - user="root", - ) - ).strip() - ) - print( - str( - node3.exec_in_container( - [ - "bash", - "-c", - "lsof -a -i4 -i6 -itcp -w | grep ':2181' | grep ESTABLISHED", - ], - privileged=True, - user="root", - ) + assert_eq_with_retry( + node, + "select index from system.zookeeper_connection", + str(new_idx) + "\n", ) - ) - assert ( - "1" - == str( - node3.exec_in_container( - [ - "bash", - "-c", - "lsof -a -i4 -i6 -itcp -w | grep 'testzookeeperconfigloadbalancing_zoo2_1.*testzookeeperconfigloadbalancing_default:2181' | grep ESTABLISHED | wc -l", - ], - privileged=True, - user="root", - ) - ).strip() - ) - + pm.heal_all() finally: pm.heal_all() change_balancing("round_robin", "random", reload=False) + + +def test_az(started_cluster): + pm = PartitionManager() + try: + # make sure it disconnects from the optimal node + pm._add_rule( + { + "source": node4.ip_address, + "destination": cluster.get_instance_ip("zoo2"), + "action": "REJECT --reject-with tcp-reset", + } + ) + + node4.query_with_retry("select * from system.zookeeper where path='/'") + assert "az2\n" != node4.query( + "select availability_zone from system.zookeeper_connection" + ) + + # fallback_session_lifetime.max is 1 second, but it shouldn't drop current session until the node becomes available + + time.sleep(5) # this is fine + assert 5 <= int(node4.query("select zookeeperSessionUptime()").strip()) + + pm.heal_all() + assert_eq_with_retry( + node4, "select availability_zone from system.zookeeper_connection", "az2\n" + ) + finally: + pm.heal_all() diff --git a/tests/integration/test_zookeeper_fallback_session/test.py b/tests/integration/test_zookeeper_fallback_session/test.py index 9afabfa3da3..932bbe482d2 100644 --- a/tests/integration/test_zookeeper_fallback_session/test.py +++ b/tests/integration/test_zookeeper_fallback_session/test.py @@ -84,10 +84,28 @@ def test_fallback_session(started_cluster: ClickHouseCluster): ) # at this point network partitioning has been reverted. - # the nodes should switch to zoo1 automatically because of `in_order` load-balancing. + # the nodes should switch to zoo1 because of `in_order` load-balancing. # otherwise they would connect to a random replica + + # but there's no reason to reconnect because current session works + # and there's no "optimal" node with `in_order` load-balancing + # so we need to break the current session + for node in [node1, node2, node3]: - assert_uses_zk_node(node, "zoo1") + assert_uses_zk_node(node, "zoo3") + + with PartitionManager() as pm: + for node in started_cluster.instances.values(): + pm._add_rule( + { + "source": node.ip_address, + "destination": cluster.get_instance_ip("zoo3"), + "action": "REJECT --reject-with tcp-reset", + } + ) + + for node in [node1, node2, node3]: + assert_uses_zk_node(node, "zoo1") node1.query_with_retry("INSERT INTO simple VALUES ({0}, {0})".format(2)) for node in [node2, node3]: diff --git a/utils/keeper-bench/Runner.cpp b/utils/keeper-bench/Runner.cpp index ed7e09685f0..5ae4c7a0b1c 100644 --- a/utils/keeper-bench/Runner.cpp +++ b/utils/keeper-bench/Runner.cpp @@ -1238,9 +1238,13 @@ void Runner::createConnections() std::shared_ptr Runner::getConnection(const ConnectionInfo & connection_info, size_t connection_info_idx) { - Coordination::ZooKeeper::Node node{Poco::Net::SocketAddress{connection_info.host}, static_cast(connection_info_idx), connection_info.secure}; - std::vector nodes; - nodes.push_back(node); + zkutil::ShuffleHost host; + host.host = connection_info.host; + host.secure = connection_info.secure; + host.original_index = static_cast(connection_info_idx); + host.address = Poco::Net::SocketAddress{connection_info.host}; + + zkutil::ShuffleHosts nodes{host}; zkutil::ZooKeeperArgs args; args.session_timeout_ms = connection_info.session_timeout_ms; args.connection_timeout_ms = connection_info.connection_timeout_ms;