mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge pull request #30498 from ClickHouse/zookeeper_client_fault_injection
Add fault injection in ZooKeeper client
This commit is contained in:
commit
5a51498af1
@ -38,6 +38,7 @@ function install_packages()
|
||||
|
||||
function configure()
|
||||
{
|
||||
export ZOOKEEPER_FAULT_INJECTION=1
|
||||
# install test configs
|
||||
export USE_DATABASE_ORDINARY=1
|
||||
export EXPORT_S3_STORAGE_POLICIES=1
|
||||
|
@ -168,7 +168,7 @@ def prepare_for_hung_check(drop_databases):
|
||||
for db in databases:
|
||||
if db == "system":
|
||||
continue
|
||||
command = make_query_command(f"DROP DATABASE {db}")
|
||||
command = make_query_command(f'DETACH DATABASE {db}')
|
||||
# we don't wait for drop
|
||||
Popen(command, shell=True)
|
||||
break
|
||||
|
@ -428,6 +428,12 @@ public:
|
||||
Exception(const Error code_, const std::string & path); /// NOLINT
|
||||
Exception(const Exception & exc);
|
||||
|
||||
template <typename... Args>
|
||||
Exception(const Error code_, fmt::format_string<Args...> fmt, Args &&... args)
|
||||
: Exception(fmt::format(fmt, std::forward<Args>(args)...), code_)
|
||||
{
|
||||
}
|
||||
|
||||
const char * name() const noexcept override { return "Coordination::Exception"; }
|
||||
const char * className() const noexcept override { return "Coordination::Exception"; }
|
||||
Exception * clone() const override { return new Exception(*this); }
|
||||
|
@ -507,15 +507,15 @@ ResponsePtr TestKeeperSyncRequest::createResponse() const { return std::make_sha
|
||||
ResponsePtr TestKeeperMultiRequest::createResponse() const { return std::make_shared<MultiResponse>(); }
|
||||
|
||||
|
||||
TestKeeper::TestKeeper(const String & root_path_, Poco::Timespan operation_timeout_)
|
||||
: root_path(root_path_), operation_timeout(operation_timeout_)
|
||||
TestKeeper::TestKeeper(const zkutil::ZooKeeperArgs & args_)
|
||||
: args(args_)
|
||||
{
|
||||
container.emplace("/", Node());
|
||||
|
||||
if (!root_path.empty())
|
||||
if (!args.chroot.empty())
|
||||
{
|
||||
if (root_path.back() == '/')
|
||||
root_path.pop_back();
|
||||
if (args.chroot.back() == '/')
|
||||
args.chroot.pop_back();
|
||||
}
|
||||
|
||||
processing_thread = ThreadFromGlobalPool([this] { processingThread(); });
|
||||
@ -547,7 +547,7 @@ void TestKeeper::processingThread()
|
||||
{
|
||||
RequestInfo info;
|
||||
|
||||
UInt64 max_wait = static_cast<UInt64>(operation_timeout.totalMilliseconds());
|
||||
UInt64 max_wait = static_cast<UInt64>(args.operation_timeout_ms);
|
||||
if (requests_queue.tryPop(info, max_wait))
|
||||
{
|
||||
if (expired)
|
||||
@ -556,7 +556,7 @@ void TestKeeper::processingThread()
|
||||
|
||||
++zxid;
|
||||
|
||||
info.request->addRootPath(root_path);
|
||||
info.request->addRootPath(args.chroot);
|
||||
auto [response, _] = info.request->process(container, zxid);
|
||||
|
||||
if (info.watch)
|
||||
@ -580,7 +580,7 @@ void TestKeeper::processingThread()
|
||||
if (response->error == Error::ZOK)
|
||||
info.request->processWatches(watches, list_watches);
|
||||
|
||||
response->removeRootPath(root_path);
|
||||
response->removeRootPath(args.chroot);
|
||||
if (info.callback)
|
||||
info.callback(*response);
|
||||
}
|
||||
@ -689,7 +689,7 @@ void TestKeeper::pushRequest(RequestInfo && request)
|
||||
if (expired)
|
||||
throw Exception("Session expired", Error::ZSESSIONEXPIRED);
|
||||
|
||||
if (!requests_queue.tryPush(std::move(request), operation_timeout.totalMilliseconds()))
|
||||
if (!requests_queue.tryPush(std::move(request), args.operation_timeout_ms))
|
||||
throw Exception("Cannot push request to queue within operation timeout", Error::ZOPERATIONTIMEOUT);
|
||||
}
|
||||
catch (...)
|
||||
|
@ -8,6 +8,7 @@
|
||||
|
||||
#include <Poco/Timespan.h>
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperArgs.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/ConcurrentBoundedQueue.h>
|
||||
|
||||
@ -33,7 +34,7 @@ using TestKeeperRequestPtr = std::shared_ptr<TestKeeperRequest>;
|
||||
class TestKeeper final : public IKeeper
|
||||
{
|
||||
public:
|
||||
TestKeeper(const String & root_path_, Poco::Timespan operation_timeout_);
|
||||
TestKeeper(const zkutil::ZooKeeperArgs & args_);
|
||||
~TestKeeper() override;
|
||||
|
||||
bool isExpired() const override { return expired; }
|
||||
@ -123,10 +124,7 @@ private:
|
||||
|
||||
Container container;
|
||||
|
||||
String root_path;
|
||||
ACLs default_acls;
|
||||
|
||||
Poco::Timespan operation_timeout;
|
||||
zkutil::ZooKeeperArgs args;
|
||||
|
||||
std::mutex push_request_mutex;
|
||||
std::atomic<bool> expired{false};
|
||||
|
@ -6,20 +6,18 @@
|
||||
#include <functional>
|
||||
#include <filesystem>
|
||||
|
||||
#include <Common/randomSeed.h>
|
||||
#include <base/find_symbols.h>
|
||||
#include <base/sort.h>
|
||||
#include <base/getFQDNOrHostName.h>
|
||||
#include "Common/ZooKeeper/IKeeper.h"
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/isLocalAddress.h>
|
||||
|
||||
#include <Poco/Net/NetException.h>
|
||||
#include <Poco/Net/DNS.h>
|
||||
|
||||
|
||||
#define ZOOKEEPER_CONNECTION_TIMEOUT_MS 1000
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace DB
|
||||
@ -49,25 +47,19 @@ static void check(Coordination::Error code, const std::string & path)
|
||||
}
|
||||
|
||||
|
||||
void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_, const std::string & identity_,
|
||||
int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_, const GetPriorityForLoadBalancing & get_priority_load_balancing_)
|
||||
{
|
||||
log = &Poco::Logger::get("ZooKeeper");
|
||||
hosts = hosts_;
|
||||
identity = identity_;
|
||||
session_timeout_ms = session_timeout_ms_;
|
||||
operation_timeout_ms = operation_timeout_ms_;
|
||||
chroot = chroot_;
|
||||
implementation = implementation_;
|
||||
get_priority_load_balancing = get_priority_load_balancing_;
|
||||
void ZooKeeper::init(ZooKeeperArgs args_)
|
||||
|
||||
if (implementation == "zookeeper")
|
||||
{
|
||||
args = std::move(args_);
|
||||
log = &Poco::Logger::get("ZooKeeper");
|
||||
|
||||
if (args.implementation == "zookeeper")
|
||||
{
|
||||
if (hosts.empty())
|
||||
if (args.hosts.empty())
|
||||
throw KeeperException("No hosts passed to ZooKeeper constructor.", Coordination::Error::ZBADARGUMENTS);
|
||||
|
||||
Coordination::ZooKeeper::Nodes nodes;
|
||||
nodes.reserve(hosts.size());
|
||||
nodes.reserve(args.hosts.size());
|
||||
|
||||
/// Shuffle the hosts to distribute the load among ZooKeeper nodes.
|
||||
std::vector<ShuffleHost> shuffled_hosts = shuffleHosts();
|
||||
@ -108,33 +100,23 @@ void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_
|
||||
throw KeeperException("Cannot use any of provided ZooKeeper nodes", Coordination::Error::ZBADARGUMENTS);
|
||||
}
|
||||
|
||||
impl = std::make_unique<Coordination::ZooKeeper>(
|
||||
nodes,
|
||||
chroot,
|
||||
identity_.empty() ? "" : "digest",
|
||||
identity_,
|
||||
Poco::Timespan(0, session_timeout_ms_ * 1000),
|
||||
Poco::Timespan(0, ZOOKEEPER_CONNECTION_TIMEOUT_MS * 1000),
|
||||
Poco::Timespan(0, operation_timeout_ms_ * 1000),
|
||||
zk_log);
|
||||
impl = std::make_unique<Coordination::ZooKeeper>(nodes, args, zk_log);
|
||||
|
||||
if (chroot.empty())
|
||||
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(hosts, ","));
|
||||
if (args.chroot.empty())
|
||||
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(args.hosts, ","));
|
||||
else
|
||||
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(hosts, ","), chroot);
|
||||
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), args.chroot);
|
||||
}
|
||||
else if (implementation == "testkeeper")
|
||||
else if (args.implementation == "testkeeper")
|
||||
{
|
||||
impl = std::make_unique<Coordination::TestKeeper>(
|
||||
chroot,
|
||||
Poco::Timespan(0, operation_timeout_ms_ * 1000));
|
||||
impl = std::make_unique<Coordination::TestKeeper>(args);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw DB::Exception("Unknown implementation of coordination service: " + implementation, DB::ErrorCodes::NOT_IMPLEMENTED);
|
||||
throw DB::Exception("Unknown implementation of coordination service: " + args.implementation, DB::ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
if (!chroot.empty())
|
||||
if (!args.chroot.empty())
|
||||
{
|
||||
/// Here we check that zk root exists.
|
||||
/// This check is clumsy. The reason is we do this request under common mutex, and never want to hung here.
|
||||
@ -144,7 +126,7 @@ void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_
|
||||
/// This should not happen now, when memory tracker is disabled.
|
||||
/// But let's keep it just in case (it is also easy to backport).
|
||||
auto future = asyncExists("/");
|
||||
auto res = future.wait_for(std::chrono::milliseconds(operation_timeout_ms));
|
||||
auto res = future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms));
|
||||
if (res != std::future_status::ready)
|
||||
throw KeeperException("Cannot check if zookeeper root exists.", Coordination::Error::ZOPERATIONTIMEOUT);
|
||||
|
||||
@ -153,18 +135,30 @@ void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_
|
||||
throw KeeperException(code, "/");
|
||||
|
||||
if (code == Coordination::Error::ZNONODE)
|
||||
throw KeeperException("ZooKeeper root doesn't exist. You should create root node " + chroot + " before start.", Coordination::Error::ZNONODE);
|
||||
throw KeeperException("ZooKeeper root doesn't exist. You should create root node " + args.chroot + " before start.", Coordination::Error::ZNONODE);
|
||||
}
|
||||
}
|
||||
|
||||
ZooKeeper::ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
|
||||
{
|
||||
zk_log = std::move(zk_log_);
|
||||
init(args_);
|
||||
}
|
||||
|
||||
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
|
||||
: zk_log(std::move(zk_log_))
|
||||
{
|
||||
init(ZooKeeperArgs(config, config_name));
|
||||
}
|
||||
|
||||
std::vector<ShuffleHost> ZooKeeper::shuffleHosts() const
|
||||
{
|
||||
std::function<size_t(size_t index)> get_priority = get_priority_load_balancing.getPriorityFunc(get_priority_load_balancing.load_balancing, 0, hosts.size());
|
||||
std::function<size_t(size_t index)> get_priority = args.get_priority_load_balancing.getPriorityFunc(args.get_priority_load_balancing.load_balancing, 0, args.hosts.size());
|
||||
std::vector<ShuffleHost> shuffle_hosts;
|
||||
for (size_t i = 0; i < hosts.size(); ++i)
|
||||
for (size_t i = 0; i < args.hosts.size(); ++i)
|
||||
{
|
||||
ShuffleHost shuffle_host;
|
||||
shuffle_host.host = hosts[i];
|
||||
shuffle_host.host = args.hosts[i];
|
||||
if (get_priority)
|
||||
shuffle_host.priority = get_priority(i);
|
||||
shuffle_host.randomize();
|
||||
@ -181,125 +175,16 @@ std::vector<ShuffleHost> ZooKeeper::shuffleHosts() const
|
||||
return shuffle_hosts;
|
||||
}
|
||||
|
||||
ZooKeeper::ZooKeeper(const std::string & hosts_string, const std::string & identity_, int32_t session_timeout_ms_,
|
||||
int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_,
|
||||
std::shared_ptr<DB::ZooKeeperLog> zk_log_, const GetPriorityForLoadBalancing & get_priority_load_balancing_)
|
||||
{
|
||||
zk_log = std::move(zk_log_);
|
||||
Strings hosts_strings;
|
||||
splitInto<','>(hosts_strings, hosts_string);
|
||||
|
||||
init(implementation_, hosts_strings, identity_, session_timeout_ms_, operation_timeout_ms_, chroot_, get_priority_load_balancing_);
|
||||
}
|
||||
|
||||
ZooKeeper::ZooKeeper(const Strings & hosts_, const std::string & identity_, int32_t session_timeout_ms_,
|
||||
int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_,
|
||||
std::shared_ptr<DB::ZooKeeperLog> zk_log_, const GetPriorityForLoadBalancing & get_priority_load_balancing_)
|
||||
{
|
||||
zk_log = std::move(zk_log_);
|
||||
init(implementation_, hosts_, identity_, session_timeout_ms_, operation_timeout_ms_, chroot_, get_priority_load_balancing_);
|
||||
}
|
||||
|
||||
struct ZooKeeperArgs
|
||||
{
|
||||
ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
config.keys(config_name, keys);
|
||||
|
||||
session_timeout_ms = Coordination::DEFAULT_SESSION_TIMEOUT_MS;
|
||||
operation_timeout_ms = Coordination::DEFAULT_OPERATION_TIMEOUT_MS;
|
||||
implementation = "zookeeper";
|
||||
for (const auto & key : keys)
|
||||
{
|
||||
if (startsWith(key, "node"))
|
||||
{
|
||||
hosts.push_back(
|
||||
(config.getBool(config_name + "." + key + ".secure", false) ? "secure://" : "") +
|
||||
config.getString(config_name + "." + key + ".host") + ":"
|
||||
+ config.getString(config_name + "." + key + ".port", "2181")
|
||||
);
|
||||
}
|
||||
else if (key == "session_timeout_ms")
|
||||
{
|
||||
session_timeout_ms = config.getInt(config_name + "." + key);
|
||||
}
|
||||
else if (key == "operation_timeout_ms")
|
||||
{
|
||||
operation_timeout_ms = config.getInt(config_name + "." + key);
|
||||
}
|
||||
else if (key == "identity")
|
||||
{
|
||||
identity = config.getString(config_name + "." + key);
|
||||
}
|
||||
else if (key == "root")
|
||||
{
|
||||
chroot = config.getString(config_name + "." + key);
|
||||
}
|
||||
else if (key == "implementation")
|
||||
{
|
||||
implementation = config.getString(config_name + "." + key);
|
||||
}
|
||||
else if (key == "zookeeper_load_balancing")
|
||||
{
|
||||
String load_balancing_str = config.getString(config_name + "." + key);
|
||||
/// Use magic_enum to avoid dependency from dbms (`SettingFieldLoadBalancingTraits::fromString(...)`)
|
||||
auto load_balancing = magic_enum::enum_cast<DB::LoadBalancing>(Poco::toUpper(load_balancing_str));
|
||||
if (!load_balancing)
|
||||
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknown load balancing: {}", load_balancing_str);
|
||||
get_priority_load_balancing.load_balancing = *load_balancing;
|
||||
}
|
||||
else
|
||||
throw KeeperException(std::string("Unknown key ") + key + " in config file", Coordination::Error::ZBADARGUMENTS);
|
||||
}
|
||||
|
||||
if (!chroot.empty())
|
||||
{
|
||||
if (chroot.front() != '/')
|
||||
throw KeeperException(std::string("Root path in config file should start with '/', but got ") + chroot, Coordination::Error::ZBADARGUMENTS);
|
||||
if (chroot.back() == '/')
|
||||
chroot.pop_back();
|
||||
}
|
||||
|
||||
/// init get_priority_load_balancing
|
||||
get_priority_load_balancing.hostname_differences.resize(hosts.size());
|
||||
const String & local_hostname = getFQDNOrHostName();
|
||||
for (size_t i = 0; i < hosts.size(); ++i)
|
||||
{
|
||||
const String & node_host = hosts[i].substr(0, hosts[i].find_last_of(':'));
|
||||
get_priority_load_balancing.hostname_differences[i] = DB::getHostNameDifference(local_hostname, node_host);
|
||||
}
|
||||
}
|
||||
|
||||
Strings hosts;
|
||||
std::string identity;
|
||||
int session_timeout_ms;
|
||||
int operation_timeout_ms;
|
||||
std::string chroot;
|
||||
std::string implementation;
|
||||
GetPriorityForLoadBalancing get_priority_load_balancing;
|
||||
};
|
||||
|
||||
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
|
||||
: zk_log(std::move(zk_log_))
|
||||
{
|
||||
ZooKeeperArgs args(config, config_name);
|
||||
init(args.implementation, args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot, args.get_priority_load_balancing);
|
||||
}
|
||||
|
||||
bool ZooKeeper::configChanged(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) const
|
||||
{
|
||||
ZooKeeperArgs args(config, config_name);
|
||||
ZooKeeperArgs new_args(config, config_name);
|
||||
|
||||
// skip reload testkeeper cause it's for test and data in memory
|
||||
if (args.implementation == implementation && implementation == "testkeeper")
|
||||
if (new_args.implementation == args.implementation && args.implementation == "testkeeper")
|
||||
return false;
|
||||
|
||||
if (args.get_priority_load_balancing != get_priority_load_balancing)
|
||||
return true;
|
||||
|
||||
return std::tie(args.implementation, args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot, args.get_priority_load_balancing)
|
||||
!= std::tie(implementation, hosts, identity, session_timeout_ms, operation_timeout_ms, chroot, args.get_priority_load_balancing);
|
||||
return args != new_args;
|
||||
}
|
||||
|
||||
|
||||
@ -318,7 +203,7 @@ Coordination::Error ZooKeeper::getChildrenImpl(const std::string & path, Strings
|
||||
{
|
||||
auto future_result = asyncTryGetChildrenNoThrow(path, watch_callback, list_request_type);
|
||||
|
||||
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
|
||||
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
|
||||
{
|
||||
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::List), path));
|
||||
return Coordination::Error::ZOPERATIONTIMEOUT;
|
||||
@ -385,7 +270,7 @@ Coordination::Error ZooKeeper::createImpl(const std::string & path, const std::s
|
||||
{
|
||||
auto future_result = asyncTryCreateNoThrow(path, data, mode);
|
||||
|
||||
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
|
||||
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
|
||||
{
|
||||
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Create), path));
|
||||
return Coordination::Error::ZOPERATIONTIMEOUT;
|
||||
@ -455,7 +340,7 @@ Coordination::Error ZooKeeper::removeImpl(const std::string & path, int32_t vers
|
||||
auto future_result = asyncTryRemoveNoThrow(path, version);
|
||||
|
||||
|
||||
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
|
||||
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
|
||||
{
|
||||
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Remove), path));
|
||||
return Coordination::Error::ZOPERATIONTIMEOUT;
|
||||
@ -487,7 +372,7 @@ Coordination::Error ZooKeeper::existsImpl(const std::string & path, Coordination
|
||||
{
|
||||
auto future_result = asyncTryExistsNoThrow(path, watch_callback);
|
||||
|
||||
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
|
||||
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
|
||||
{
|
||||
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Exists), path));
|
||||
return Coordination::Error::ZOPERATIONTIMEOUT;
|
||||
@ -521,7 +406,7 @@ Coordination::Error ZooKeeper::getImpl(const std::string & path, std::string & r
|
||||
{
|
||||
auto future_result = asyncTryGetNoThrow(path, watch_callback);
|
||||
|
||||
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
|
||||
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
|
||||
{
|
||||
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Get), path));
|
||||
return Coordination::Error::ZOPERATIONTIMEOUT;
|
||||
@ -593,7 +478,7 @@ Coordination::Error ZooKeeper::setImpl(const std::string & path, const std::stri
|
||||
{
|
||||
auto future_result = asyncTrySetNoThrow(path, data, version);
|
||||
|
||||
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
|
||||
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
|
||||
{
|
||||
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Set), path));
|
||||
return Coordination::Error::ZOPERATIONTIMEOUT;
|
||||
@ -645,7 +530,7 @@ Coordination::Error ZooKeeper::multiImpl(const Coordination::Requests & requests
|
||||
|
||||
auto future_result = asyncTryMultiNoThrow(requests);
|
||||
|
||||
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
|
||||
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
|
||||
{
|
||||
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Multi), requests[0]->getPath()));
|
||||
return Coordination::Error::ZOPERATIONTIMEOUT;
|
||||
@ -679,7 +564,7 @@ Coordination::Error ZooKeeper::syncImpl(const std::string & path, std::string &
|
||||
{
|
||||
auto future_result = asyncTrySyncNoThrow(path);
|
||||
|
||||
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
|
||||
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
|
||||
{
|
||||
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Sync), path));
|
||||
return Coordination::Error::ZOPERATIONTIMEOUT;
|
||||
@ -884,7 +769,7 @@ void ZooKeeper::waitForEphemeralToDisappearIfAny(const std::string & path)
|
||||
if (!tryGet(path, content, nullptr, eph_node_disappeared))
|
||||
return;
|
||||
|
||||
int32_t timeout_ms = 3 * session_timeout_ms;
|
||||
int32_t timeout_ms = 3 * args.session_timeout_ms;
|
||||
if (!eph_node_disappeared->tryWait(timeout_ms))
|
||||
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
|
||||
"Ephemeral node {} still exists after {}s, probably it's owned by someone else. "
|
||||
@ -894,7 +779,7 @@ void ZooKeeper::waitForEphemeralToDisappearIfAny(const std::string & path)
|
||||
|
||||
ZooKeeperPtr ZooKeeper::startNewSession() const
|
||||
{
|
||||
return std::make_shared<ZooKeeper>(hosts, identity, session_timeout_ms, operation_timeout_ms, chroot, implementation, zk_log, get_priority_load_balancing);
|
||||
return std::make_shared<ZooKeeper>(args, zk_log);
|
||||
}
|
||||
|
||||
|
||||
|
@ -13,7 +13,7 @@
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperConstants.h>
|
||||
#include <Common/GetPriorityForLoadBalancing.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperArgs.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <unistd.h>
|
||||
#include <random>
|
||||
@ -72,24 +72,11 @@ using GetPriorityForLoadBalancing = DB::GetPriorityForLoadBalancing;
|
||||
class ZooKeeper
|
||||
{
|
||||
public:
|
||||
|
||||
using Ptr = std::shared_ptr<ZooKeeper>;
|
||||
|
||||
/// hosts_string -- comma separated [secure://]host:port list
|
||||
explicit ZooKeeper(const std::string & hosts_string, const std::string & identity_ = "",
|
||||
int32_t session_timeout_ms_ = Coordination::DEFAULT_SESSION_TIMEOUT_MS,
|
||||
int32_t operation_timeout_ms_ = Coordination::DEFAULT_OPERATION_TIMEOUT_MS,
|
||||
const std::string & chroot_ = "",
|
||||
const std::string & implementation_ = "zookeeper",
|
||||
std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr,
|
||||
const GetPriorityForLoadBalancing & get_priority_load_balancing_ = {});
|
||||
ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr);
|
||||
|
||||
explicit ZooKeeper(const Strings & hosts_, const std::string & identity_ = "",
|
||||
int32_t session_timeout_ms_ = Coordination::DEFAULT_SESSION_TIMEOUT_MS,
|
||||
int32_t operation_timeout_ms_ = Coordination::DEFAULT_OPERATION_TIMEOUT_MS,
|
||||
const std::string & chroot_ = "",
|
||||
const std::string & implementation_ = "zookeeper",
|
||||
std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr,
|
||||
const GetPriorityForLoadBalancing & get_priority_load_balancing_ = {});
|
||||
|
||||
/** Config of the form:
|
||||
<zookeeper>
|
||||
@ -337,8 +324,7 @@ public:
|
||||
private:
|
||||
friend class EphemeralNodeHolder;
|
||||
|
||||
void init(const std::string & implementation_, const Strings & hosts_, const std::string & identity_,
|
||||
int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_, const GetPriorityForLoadBalancing & get_priority_load_balancing_);
|
||||
void init(ZooKeeperArgs args_);
|
||||
|
||||
/// The following methods don't any throw exceptions but return error codes.
|
||||
Coordination::Error createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created);
|
||||
@ -358,20 +344,13 @@ private:
|
||||
|
||||
std::unique_ptr<Coordination::IKeeper> impl;
|
||||
|
||||
Strings hosts;
|
||||
std::string identity;
|
||||
int32_t session_timeout_ms;
|
||||
int32_t operation_timeout_ms;
|
||||
std::string chroot;
|
||||
std::string implementation;
|
||||
ZooKeeperArgs args;
|
||||
|
||||
std::mutex mutex;
|
||||
|
||||
Poco::Logger * log = nullptr;
|
||||
std::shared_ptr<DB::ZooKeeperLog> zk_log;
|
||||
|
||||
GetPriorityForLoadBalancing get_priority_load_balancing;
|
||||
|
||||
AtomicStopwatch session_uptime;
|
||||
};
|
||||
|
||||
|
108
src/Common/ZooKeeper/ZooKeeperArgs.cpp
Normal file
108
src/Common/ZooKeeper/ZooKeeperArgs.cpp
Normal file
@ -0,0 +1,108 @@
|
||||
#include <Common/ZooKeeper/ZooKeeperArgs.h>
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
#include <base/find_symbols.h>
|
||||
#include <base/getFQDNOrHostName.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Common/isLocalAddress.h>
|
||||
#include <Poco/String.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
}
|
||||
|
||||
namespace zkutil
|
||||
{
|
||||
|
||||
ZooKeeperArgs::ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, const String & config_name)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
config.keys(config_name, keys);
|
||||
|
||||
for (const auto & key : keys)
|
||||
{
|
||||
if (key.starts_with("node"))
|
||||
{
|
||||
hosts.push_back(
|
||||
(config.getBool(config_name + "." + key + ".secure", false) ? "secure://" : "")
|
||||
+ config.getString(config_name + "." + key + ".host") + ":" + config.getString(config_name + "." + key + ".port", "2181"));
|
||||
}
|
||||
else if (key == "session_timeout_ms")
|
||||
{
|
||||
session_timeout_ms = config.getInt(config_name + "." + key);
|
||||
}
|
||||
else if (key == "operation_timeout_ms")
|
||||
{
|
||||
operation_timeout_ms = config.getInt(config_name + "." + key);
|
||||
}
|
||||
else if (key == "connection_timeout_ms")
|
||||
{
|
||||
connection_timeout_ms = config.getInt(config_name + "." + key);
|
||||
}
|
||||
else if (key == "send_fault_probability")
|
||||
{
|
||||
send_fault_probability = config.getDouble(config_name + "." + key);
|
||||
}
|
||||
else if (key == "recv_fault_probability")
|
||||
{
|
||||
recv_fault_probability = config.getDouble(config_name + "." + key);
|
||||
}
|
||||
else if (key == "identity")
|
||||
{
|
||||
identity = config.getString(config_name + "." + key);
|
||||
if (!identity.empty())
|
||||
auth_scheme = "digest";
|
||||
}
|
||||
else if (key == "root")
|
||||
{
|
||||
chroot = config.getString(config_name + "." + key);
|
||||
}
|
||||
else if (key == "implementation")
|
||||
{
|
||||
implementation = config.getString(config_name + "." + key);
|
||||
}
|
||||
else if (key == "zookeeper_load_balancing")
|
||||
{
|
||||
String load_balancing_str = config.getString(config_name + "." + key);
|
||||
/// Use magic_enum to avoid dependency from dbms (`SettingFieldLoadBalancingTraits::fromString(...)`)
|
||||
auto load_balancing = magic_enum::enum_cast<DB::LoadBalancing>(Poco::toUpper(load_balancing_str));
|
||||
if (!load_balancing)
|
||||
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknown load balancing: {}", load_balancing_str);
|
||||
get_priority_load_balancing.load_balancing = *load_balancing;
|
||||
}
|
||||
else
|
||||
throw KeeperException(std::string("Unknown key ") + key + " in config file", Coordination::Error::ZBADARGUMENTS);
|
||||
}
|
||||
|
||||
if (!chroot.empty())
|
||||
{
|
||||
if (chroot.front() != '/')
|
||||
throw KeeperException(
|
||||
Coordination::Error::ZBADARGUMENTS,
|
||||
"Root path in config file should start with '/', but got {}", chroot);
|
||||
if (chroot.back() == '/')
|
||||
chroot.pop_back();
|
||||
}
|
||||
|
||||
if (session_timeout_ms < 0 || operation_timeout_ms < 0 || connection_timeout_ms < 0)
|
||||
throw KeeperException("Timeout cannot be negative", Coordination::Error::ZBADARGUMENTS);
|
||||
|
||||
/// init get_priority_load_balancing
|
||||
get_priority_load_balancing.hostname_differences.resize(hosts.size());
|
||||
const String & local_hostname = getFQDNOrHostName();
|
||||
for (size_t i = 0; i < hosts.size(); ++i)
|
||||
{
|
||||
const String & node_host = hosts[i].substr(0, hosts[i].find_last_of(':'));
|
||||
get_priority_load_balancing.hostname_differences[i] = DB::getHostNameDifference(local_hostname, node_host);
|
||||
}
|
||||
}
|
||||
|
||||
ZooKeeperArgs::ZooKeeperArgs(const String & hosts_string)
|
||||
{
|
||||
splitInto<','>(hosts, hosts_string);
|
||||
}
|
||||
|
||||
}
|
37
src/Common/ZooKeeper/ZooKeeperArgs.h
Normal file
37
src/Common/ZooKeeper/ZooKeeperArgs.h
Normal file
@ -0,0 +1,37 @@
|
||||
#pragma once
|
||||
#include <Common/ZooKeeper/Types.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperConstants.h>
|
||||
#include <Common/GetPriorityForLoadBalancing.h>
|
||||
|
||||
namespace Poco::Util
|
||||
{
|
||||
class AbstractConfiguration;
|
||||
}
|
||||
|
||||
namespace zkutil
|
||||
{
|
||||
|
||||
struct ZooKeeperArgs
|
||||
{
|
||||
ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, const String & config_name);
|
||||
|
||||
/// hosts_string -- comma separated [secure://]host:port list
|
||||
ZooKeeperArgs(const String & hosts_string);
|
||||
ZooKeeperArgs() = default;
|
||||
bool operator == (const ZooKeeperArgs &) const = default;
|
||||
|
||||
String implementation = "zookeeper";
|
||||
Strings hosts;
|
||||
String auth_scheme;
|
||||
String identity;
|
||||
String chroot;
|
||||
int32_t connection_timeout_ms = Coordination::DEFAULT_CONNECTION_TIMEOUT_MS;
|
||||
int32_t session_timeout_ms = Coordination::DEFAULT_SESSION_TIMEOUT_MS;
|
||||
int32_t operation_timeout_ms = Coordination::DEFAULT_OPERATION_TIMEOUT_MS;
|
||||
float send_fault_probability = 0;
|
||||
float recv_fault_probability = 0;
|
||||
|
||||
DB::GetPriorityForLoadBalancing get_priority_load_balancing;
|
||||
};
|
||||
|
||||
}
|
@ -56,5 +56,6 @@ static constexpr int32_t DEFAULT_SESSION_TIMEOUT_MS = 30000;
|
||||
static constexpr int32_t DEFAULT_MIN_SESSION_TIMEOUT_MS = 10000;
|
||||
static constexpr int32_t DEFAULT_MAX_SESSION_TIMEOUT_MS = 100000;
|
||||
static constexpr int32_t DEFAULT_OPERATION_TIMEOUT_MS = 10000;
|
||||
static constexpr int32_t DEFAULT_CONNECTION_TIMEOUT_MS = 1000;
|
||||
|
||||
}
|
||||
|
@ -276,15 +276,15 @@ void ZooKeeper::read(T & x)
|
||||
Coordination::read(x, *in);
|
||||
}
|
||||
|
||||
static void removeRootPath(String & path, const String & root_path)
|
||||
static void removeRootPath(String & path, const String & chroot)
|
||||
{
|
||||
if (root_path.empty())
|
||||
if (chroot.empty())
|
||||
return;
|
||||
|
||||
if (path.size() <= root_path.size())
|
||||
throw Exception("Received path is not longer than root_path", Error::ZDATAINCONSISTENCY);
|
||||
if (path.size() <= chroot.size())
|
||||
throw Exception(Error::ZDATAINCONSISTENCY, "Received path is not longer than chroot");
|
||||
|
||||
path = path.substr(root_path.size());
|
||||
path = path.substr(chroot.size());
|
||||
}
|
||||
|
||||
ZooKeeper::~ZooKeeper()
|
||||
@ -308,27 +308,20 @@ ZooKeeper::~ZooKeeper()
|
||||
|
||||
ZooKeeper::ZooKeeper(
|
||||
const Nodes & nodes,
|
||||
const String & root_path_,
|
||||
const String & auth_scheme,
|
||||
const String & auth_data,
|
||||
Poco::Timespan session_timeout_,
|
||||
Poco::Timespan connection_timeout,
|
||||
Poco::Timespan operation_timeout_,
|
||||
const zkutil::ZooKeeperArgs & args_,
|
||||
std::shared_ptr<ZooKeeperLog> zk_log_)
|
||||
: root_path(root_path_),
|
||||
session_timeout(session_timeout_),
|
||||
operation_timeout(std::min(operation_timeout_, session_timeout_))
|
||||
: args(args_)
|
||||
{
|
||||
log = &Poco::Logger::get("ZooKeeperClient");
|
||||
std::atomic_store(&zk_log, std::move(zk_log_));
|
||||
|
||||
if (!root_path.empty())
|
||||
if (!args.chroot.empty())
|
||||
{
|
||||
if (root_path.back() == '/')
|
||||
root_path.pop_back();
|
||||
if (args.chroot.back() == '/')
|
||||
args.chroot.pop_back();
|
||||
}
|
||||
|
||||
if (auth_scheme.empty())
|
||||
if (args.auth_scheme.empty())
|
||||
{
|
||||
ACL acl;
|
||||
acl.permissions = ACL::All;
|
||||
@ -345,10 +338,22 @@ ZooKeeper::ZooKeeper(
|
||||
default_acls.emplace_back(std::move(acl));
|
||||
}
|
||||
|
||||
connect(nodes, connection_timeout);
|
||||
|
||||
if (!auth_scheme.empty())
|
||||
sendAuth(auth_scheme, auth_data);
|
||||
/// It makes sense (especially, for async requests) to inject a fault in two places:
|
||||
/// pushRequest (before request is sent) and receiveEvent (after request was executed).
|
||||
if (0 < args.send_fault_probability && args.send_fault_probability <= 1)
|
||||
{
|
||||
send_inject_fault.emplace(args.send_fault_probability);
|
||||
}
|
||||
if (0 < args.recv_fault_probability && args.recv_fault_probability <= 1)
|
||||
{
|
||||
recv_inject_fault.emplace(args.recv_fault_probability);
|
||||
}
|
||||
|
||||
connect(nodes, args.connection_timeout_ms * 1000);
|
||||
|
||||
if (!args.auth_scheme.empty())
|
||||
sendAuth(args.auth_scheme, args.identity);
|
||||
|
||||
send_thread = ThreadFromGlobalPool([this] { sendThread(); });
|
||||
receive_thread = ThreadFromGlobalPool([this] { receiveThread(); });
|
||||
@ -364,7 +369,7 @@ void ZooKeeper::connect(
|
||||
Poco::Timespan connection_timeout)
|
||||
{
|
||||
if (nodes.empty())
|
||||
throw Exception("No nodes passed to ZooKeeper constructor", Error::ZBADARGUMENTS);
|
||||
throw Exception(Error::ZBADARGUMENTS, "No nodes passed to ZooKeeper constructor");
|
||||
|
||||
static constexpr size_t num_tries = 3;
|
||||
bool connected = false;
|
||||
@ -394,8 +399,8 @@ void ZooKeeper::connect(
|
||||
socket.connect(node.address, connection_timeout);
|
||||
socket_address = socket.peerAddress();
|
||||
|
||||
socket.setReceiveTimeout(operation_timeout);
|
||||
socket.setSendTimeout(operation_timeout);
|
||||
socket.setReceiveTimeout(args.operation_timeout_ms * 1000);
|
||||
socket.setSendTimeout(args.operation_timeout_ms * 1000);
|
||||
socket.setNoDelay(true);
|
||||
|
||||
in.emplace(socket);
|
||||
@ -453,7 +458,7 @@ void ZooKeeper::connect(
|
||||
}
|
||||
|
||||
message << fail_reasons.str() << "\n";
|
||||
throw Exception(message.str(), Error::ZCONNECTIONLOSS);
|
||||
throw Exception(Error::ZCONNECTIONLOSS, message.str());
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -466,7 +471,7 @@ void ZooKeeper::sendHandshake()
|
||||
{
|
||||
int32_t handshake_length = 44;
|
||||
int64_t last_zxid_seen = 0;
|
||||
int32_t timeout = session_timeout.totalMilliseconds();
|
||||
int32_t timeout = args.session_timeout_ms;
|
||||
int64_t previous_session_id = 0; /// We don't support session restore. So previous session_id is always zero.
|
||||
constexpr int32_t passwd_len = 16;
|
||||
std::array<char, passwd_len> passwd {};
|
||||
@ -491,7 +496,7 @@ void ZooKeeper::receiveHandshake()
|
||||
|
||||
read(handshake_length);
|
||||
if (handshake_length != SERVER_HANDSHAKE_LENGTH)
|
||||
throw Exception("Unexpected handshake length received: " + DB::toString(handshake_length), Error::ZMARSHALLINGERROR);
|
||||
throw Exception(Error::ZMARSHALLINGERROR, "Unexpected handshake length received: {}", handshake_length);
|
||||
|
||||
read(protocol_version_read);
|
||||
if (protocol_version_read != ZOOKEEPER_PROTOCOL_VERSION)
|
||||
@ -500,15 +505,15 @@ void ZooKeeper::receiveHandshake()
|
||||
/// It's better for faster failover than just connection drop.
|
||||
/// Implemented in clickhouse-keeper.
|
||||
if (protocol_version_read == KEEPER_PROTOCOL_VERSION_CONNECTION_REJECT)
|
||||
throw Exception("Keeper server rejected the connection during the handshake. Possibly it's overloaded, doesn't see leader or stale", Error::ZCONNECTIONLOSS);
|
||||
throw Exception(Error::ZCONNECTIONLOSS, "Keeper server rejected the connection during the handshake. Possibly it's overloaded, doesn't see leader or stale");
|
||||
else
|
||||
throw Exception("Unexpected protocol version: " + DB::toString(protocol_version_read), Error::ZMARSHALLINGERROR);
|
||||
throw Exception(Error::ZMARSHALLINGERROR, "Unexpected protocol version: {}", protocol_version_read);
|
||||
}
|
||||
|
||||
read(timeout);
|
||||
if (timeout != session_timeout.totalMilliseconds())
|
||||
if (timeout != args.session_timeout_ms)
|
||||
/// Use timeout from server.
|
||||
session_timeout = timeout * Poco::Timespan::MILLISECONDS;
|
||||
args.session_timeout_ms = timeout;
|
||||
|
||||
read(session_id);
|
||||
read(passwd);
|
||||
@ -535,17 +540,15 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data)
|
||||
read(err);
|
||||
|
||||
if (read_xid != AUTH_XID)
|
||||
throw Exception("Unexpected event received in reply to auth request: " + DB::toString(read_xid),
|
||||
Error::ZMARSHALLINGERROR);
|
||||
throw Exception(Error::ZMARSHALLINGERROR, "Unexpected event received in reply to auth request: {}", read_xid);
|
||||
|
||||
int32_t actual_length = in->count() - count_before_event;
|
||||
if (length != actual_length)
|
||||
throw Exception("Response length doesn't match. Expected: " + DB::toString(length) + ", actual: " + DB::toString(actual_length),
|
||||
Error::ZMARSHALLINGERROR);
|
||||
throw Exception(Error::ZMARSHALLINGERROR, "Response length doesn't match. Expected: {}, actual: {}", length, actual_length);
|
||||
|
||||
if (err != Error::ZOK)
|
||||
throw Exception("Error received in reply to auth request. Code: " + DB::toString(static_cast<int32_t>(err)) + ". Message: " + String(errorMessage(err)),
|
||||
Error::ZMARSHALLINGERROR);
|
||||
throw Exception(Error::ZMARSHALLINGERROR, "Error received in reply to auth request. Code: {}. Message: {}",
|
||||
static_cast<int32_t>(err), errorMessage(err));
|
||||
}
|
||||
|
||||
|
||||
@ -562,14 +565,14 @@ void ZooKeeper::sendThread()
|
||||
auto prev_bytes_sent = out->count();
|
||||
|
||||
auto now = clock::now();
|
||||
auto next_heartbeat_time = prev_heartbeat_time + std::chrono::milliseconds(session_timeout.totalMilliseconds() / 3);
|
||||
auto next_heartbeat_time = prev_heartbeat_time + std::chrono::milliseconds(args.session_timeout_ms / 3);
|
||||
|
||||
if (next_heartbeat_time > now)
|
||||
{
|
||||
/// Wait for the next request in queue. No more than operation timeout. No more than until next heartbeat time.
|
||||
UInt64 max_wait = std::min(
|
||||
static_cast<UInt64>(std::chrono::duration_cast<std::chrono::milliseconds>(next_heartbeat_time - now).count()),
|
||||
static_cast<UInt64>(operation_timeout.totalMilliseconds()));
|
||||
static_cast<UInt64>(args.operation_timeout_ms));
|
||||
|
||||
RequestInfo info;
|
||||
if (requests_queue.tryPop(info, max_wait))
|
||||
@ -594,7 +597,7 @@ void ZooKeeper::sendThread()
|
||||
break;
|
||||
}
|
||||
|
||||
info.request->addRootPath(root_path);
|
||||
info.request->addRootPath(args.chroot);
|
||||
|
||||
info.request->probably_sent = true;
|
||||
info.request->write(*out);
|
||||
@ -633,13 +636,13 @@ void ZooKeeper::receiveThread()
|
||||
|
||||
try
|
||||
{
|
||||
Int64 waited = 0;
|
||||
Int64 waited_us = 0;
|
||||
while (!requests_queue.isFinished())
|
||||
{
|
||||
auto prev_bytes_received = in->count();
|
||||
|
||||
clock::time_point now = clock::now();
|
||||
UInt64 max_wait = operation_timeout.totalMicroseconds();
|
||||
UInt64 max_wait_us = args.operation_timeout_ms * 1000;
|
||||
std::optional<RequestInfo> earliest_operation;
|
||||
|
||||
{
|
||||
@ -648,30 +651,32 @@ void ZooKeeper::receiveThread()
|
||||
{
|
||||
/// Operations are ordered by xid (and consequently, by time).
|
||||
earliest_operation = operations.begin()->second;
|
||||
auto earliest_operation_deadline = earliest_operation->time + std::chrono::microseconds(operation_timeout.totalMicroseconds());
|
||||
auto earliest_operation_deadline = earliest_operation->time + std::chrono::microseconds(args.operation_timeout_ms * 1000);
|
||||
if (now > earliest_operation_deadline)
|
||||
throw Exception("Operation timeout (deadline already expired) for path: " + earliest_operation->request->getPath(), Error::ZOPERATIONTIMEOUT);
|
||||
max_wait = std::chrono::duration_cast<std::chrono::microseconds>(earliest_operation_deadline - now).count();
|
||||
throw Exception(Error::ZOPERATIONTIMEOUT, "Operation timeout (deadline already expired) for path: {}",
|
||||
earliest_operation->request->getPath());
|
||||
max_wait_us = std::chrono::duration_cast<std::chrono::microseconds>(earliest_operation_deadline - now).count();
|
||||
}
|
||||
}
|
||||
|
||||
if (in->poll(max_wait))
|
||||
if (in->poll(max_wait_us))
|
||||
{
|
||||
if (requests_queue.isFinished())
|
||||
break;
|
||||
|
||||
receiveEvent();
|
||||
waited = 0;
|
||||
waited_us = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (earliest_operation)
|
||||
{
|
||||
throw Exception("Operation timeout (no response) for request " + toString(earliest_operation->request->getOpNum()) + " for path: " + earliest_operation->request->getPath(), Error::ZOPERATIONTIMEOUT);
|
||||
throw Exception(Error::ZOPERATIONTIMEOUT, "Operation timeout (no response) for request {} for path: {}",
|
||||
earliest_operation->request->getOpNum(), earliest_operation->request->getPath());
|
||||
}
|
||||
waited += max_wait;
|
||||
if (waited >= session_timeout.totalMicroseconds())
|
||||
throw Exception("Nothing is received in session timeout", Error::ZOPERATIONTIMEOUT);
|
||||
waited_us += max_wait_us;
|
||||
if (waited_us >= args.session_timeout_ms * 1000)
|
||||
throw Exception(Error::ZOPERATIONTIMEOUT, "Nothing is received in session timeout");
|
||||
|
||||
}
|
||||
|
||||
@ -703,10 +708,13 @@ void ZooKeeper::receiveEvent()
|
||||
ZooKeeperResponsePtr response;
|
||||
UInt64 elapsed_ms = 0;
|
||||
|
||||
if (unlikely(recv_inject_fault) && recv_inject_fault.value()(thread_local_rng))
|
||||
throw Exception(Error::ZSESSIONEXPIRED, "Session expired (fault injected on recv)");
|
||||
|
||||
if (xid == PING_XID)
|
||||
{
|
||||
if (err != Error::ZOK)
|
||||
throw Exception("Received error in heartbeat response: " + String(errorMessage(err)), Error::ZRUNTIMEINCONSISTENCY);
|
||||
throw Exception(Error::ZRUNTIMEINCONSISTENCY, "Received error in heartbeat response: {}", errorMessage(err));
|
||||
|
||||
response = std::make_shared<ZooKeeperHeartbeatResponse>();
|
||||
}
|
||||
@ -781,7 +789,7 @@ void ZooKeeper::receiveEvent()
|
||||
else
|
||||
{
|
||||
response->readImpl(*in);
|
||||
response->removeRootPath(root_path);
|
||||
response->removeRootPath(args.chroot);
|
||||
}
|
||||
/// Instead of setting the watch in sendEvent, set it in receiveEvent because need to check the response.
|
||||
/// The watch shouldn't be set if the node does not exist and it will never exist like sequential ephemeral nodes.
|
||||
@ -801,9 +809,9 @@ void ZooKeeper::receiveEvent()
|
||||
{
|
||||
CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch);
|
||||
|
||||
/// The key of wathces should exclude the root_path
|
||||
/// The key of wathces should exclude the args.chroot
|
||||
String req_path = request_info.request->getPath();
|
||||
removeRootPath(req_path, root_path);
|
||||
removeRootPath(req_path, args.chroot);
|
||||
std::lock_guard lock(watches_mutex);
|
||||
watches[req_path].emplace_back(std::move(request_info.watch));
|
||||
}
|
||||
@ -811,7 +819,7 @@ void ZooKeeper::receiveEvent()
|
||||
|
||||
int32_t actual_length = in->count() - count_before_event;
|
||||
if (length != actual_length)
|
||||
throw Exception("Response length doesn't match. Expected: " + DB::toString(length) + ", actual: " + DB::toString(actual_length), Error::ZMARSHALLINGERROR);
|
||||
throw Exception(Error::ZMARSHALLINGERROR, "Response length doesn't match. Expected: {}, actual: {}", length, actual_length);
|
||||
|
||||
logOperationIfNeeded(request_info.request, response, /* finalize= */ false, elapsed_ms); //-V614
|
||||
}
|
||||
@ -1035,9 +1043,9 @@ void ZooKeeper::pushRequest(RequestInfo && info)
|
||||
{
|
||||
info.request->xid = next_xid.fetch_add(1);
|
||||
if (info.request->xid == CLOSE_XID)
|
||||
throw Exception("xid equal to close_xid", Error::ZSESSIONEXPIRED);
|
||||
throw Exception(Error::ZSESSIONEXPIRED, "xid equal to close_xid");
|
||||
if (info.request->xid < 0)
|
||||
throw Exception("XID overflow", Error::ZSESSIONEXPIRED);
|
||||
throw Exception(Error::ZSESSIONEXPIRED, "XID overflow");
|
||||
|
||||
if (auto * multi_request = dynamic_cast<ZooKeeperMultiRequest *>(info.request.get()))
|
||||
{
|
||||
@ -1046,12 +1054,15 @@ void ZooKeeper::pushRequest(RequestInfo && info)
|
||||
}
|
||||
}
|
||||
|
||||
if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds()))
|
||||
if (unlikely(send_inject_fault) && send_inject_fault.value()(thread_local_rng))
|
||||
throw Exception(Error::ZSESSIONEXPIRED, "Session expired (fault injected on send)");
|
||||
|
||||
if (!requests_queue.tryPush(std::move(info), args.operation_timeout_ms))
|
||||
{
|
||||
if (requests_queue.isFinished())
|
||||
throw Exception("Session expired", Error::ZSESSIONEXPIRED);
|
||||
throw Exception(Error::ZSESSIONEXPIRED, "Session expired");
|
||||
|
||||
throw Exception("Cannot push request to queue within operation timeout", Error::ZOPERATIONTIMEOUT);
|
||||
throw Exception(Error::ZOPERATIONTIMEOUT, "Cannot push request to queue within operation timeout");
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
@ -1079,7 +1090,7 @@ void ZooKeeper::initApiVersion()
|
||||
};
|
||||
|
||||
get(keeper_api_version_path, std::move(callback), {});
|
||||
if (future.wait_for(std::chrono::milliseconds(operation_timeout.totalMilliseconds())) != std::future_status::ready)
|
||||
if (future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
|
||||
{
|
||||
LOG_TRACE(log, "Failed to get API version: timeout");
|
||||
return;
|
||||
@ -1220,7 +1231,7 @@ void ZooKeeper::list(
|
||||
if (keeper_api_version < Coordination::KeeperApiVersion::WITH_FILTERED_LIST)
|
||||
{
|
||||
if (list_request_type != ListRequestType::ALL)
|
||||
throw Exception("Filtered list request type cannot be used because it's not supported by the server", Error::ZBADARGUMENTS);
|
||||
throw Exception(Error::ZBADARGUMENTS, "Filtered list request type cannot be used because it's not supported by the server");
|
||||
|
||||
request = std::make_shared<ZooKeeperListRequest>();
|
||||
}
|
||||
@ -1299,8 +1310,8 @@ void ZooKeeper::close()
|
||||
RequestInfo request_info;
|
||||
request_info.request = std::make_shared<ZooKeeperCloseRequest>(std::move(request));
|
||||
|
||||
if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
|
||||
throw Exception("Cannot push close request to queue within operation timeout", Error::ZOPERATIONTIMEOUT);
|
||||
if (!requests_queue.tryPush(std::move(request_info), args.operation_timeout_ms))
|
||||
throw Exception(Error::ZOPERATIONTIMEOUT, "Cannot push close request to queue within operation timeout");
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::ZooKeeperClose);
|
||||
}
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperCommon.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperArgs.h>
|
||||
#include <Coordination/KeeperConstants.h>
|
||||
|
||||
#include <IO/ReadBuffer.h>
|
||||
@ -27,6 +28,7 @@
|
||||
#include <cstdint>
|
||||
#include <optional>
|
||||
#include <functional>
|
||||
#include <random>
|
||||
|
||||
|
||||
/** ZooKeeper C++ library, a replacement for libzookeeper.
|
||||
@ -111,12 +113,7 @@ public:
|
||||
*/
|
||||
ZooKeeper(
|
||||
const Nodes & nodes,
|
||||
const String & root_path,
|
||||
const String & auth_scheme,
|
||||
const String & auth_data,
|
||||
Poco::Timespan session_timeout_,
|
||||
Poco::Timespan connection_timeout,
|
||||
Poco::Timespan operation_timeout_,
|
||||
const zkutil::ZooKeeperArgs & args_,
|
||||
std::shared_ptr<ZooKeeperLog> zk_log_);
|
||||
|
||||
~ZooKeeper() override;
|
||||
@ -201,11 +198,12 @@ public:
|
||||
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);
|
||||
|
||||
private:
|
||||
String root_path;
|
||||
ACLs default_acls;
|
||||
|
||||
Poco::Timespan session_timeout;
|
||||
Poco::Timespan operation_timeout;
|
||||
zkutil::ZooKeeperArgs args;
|
||||
|
||||
std::optional<std::bernoulli_distribution> send_inject_fault;
|
||||
std::optional<std::bernoulli_distribution> recv_inject_fault;
|
||||
|
||||
Poco::Net::StreamSocket socket;
|
||||
/// To avoid excessive getpeername(2) calls.
|
||||
|
@ -5,7 +5,7 @@
|
||||
int main(int argc, char ** argv)
|
||||
try
|
||||
{
|
||||
zkutil::ZooKeeper zookeeper{"localhost:2181"};
|
||||
zkutil::ZooKeeper zookeeper{zkutil::ZooKeeperArgs("localhost:2181")};
|
||||
|
||||
auto nodes = zookeeper.getChildren("/tmp");
|
||||
|
||||
|
@ -16,7 +16,7 @@ try
|
||||
return 1;
|
||||
}
|
||||
|
||||
ZooKeeper zk(argv[1], "", 5000);
|
||||
ZooKeeper zk{zkutil::ZooKeeperArgs(argv[1])};
|
||||
|
||||
std::cout << "create path" << std::endl;
|
||||
zk.create("/test", "old", zkutil::CreateMode::Persistent);
|
||||
|
@ -40,7 +40,8 @@ try
|
||||
}
|
||||
|
||||
|
||||
ZooKeeper zk(nodes, {}, {}, {}, {5, 0}, {0, 50000}, {0, 50000}, nullptr);
|
||||
zkutil::ZooKeeperArgs args;
|
||||
ZooKeeper zk(nodes, args, nullptr);
|
||||
|
||||
Poco::Event event(true);
|
||||
|
||||
|
19
tests/config/config.d/zookeeper_fault_injection.xml
Normal file
19
tests/config/config.d/zookeeper_fault_injection.xml
Normal file
@ -0,0 +1,19 @@
|
||||
<clickhouse>
|
||||
<zookeeper>
|
||||
<node index="1">
|
||||
<host>localhost</host>
|
||||
<port>9181</port>
|
||||
</node>
|
||||
|
||||
<!-- Settings for fault injection.
|
||||
Approximate probability of request success:
|
||||
(1 - send_fault_probability) * (1 - recv_fault_probability) = 0.99998 * 0.99998 = 0.99996
|
||||
Actually it will be less, because if some request fails due to fault injection,
|
||||
then all requests which are in the queue now also fail.
|
||||
In other words, session will expire 4 times per 99996 successful requests
|
||||
or approximately each 25000 requests (on average).
|
||||
-->
|
||||
<send_fault_probability>0.00002</send_fault_probability>
|
||||
<recv_fault_probability>0.00002</recv_fault_probability>
|
||||
</zookeeper>
|
||||
</clickhouse>
|
@ -15,7 +15,6 @@ mkdir -p $DEST_SERVER_PATH/config.d/
|
||||
mkdir -p $DEST_SERVER_PATH/users.d/
|
||||
mkdir -p $DEST_CLIENT_PATH
|
||||
|
||||
ln -sf $SRC_PATH/config.d/zookeeper.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/zookeeper_write.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/listen.xml $DEST_SERVER_PATH/config.d/
|
||||
ln -sf $SRC_PATH/config.d/text_log.xml $DEST_SERVER_PATH/config.d/
|
||||
@ -89,6 +88,12 @@ ln -sf $SRC_PATH/dhparam.pem $DEST_SERVER_PATH/
|
||||
ln -sf --backup=simple --suffix=_original.xml \
|
||||
$SRC_PATH/config.d/query_masking_rules.xml $DEST_SERVER_PATH/config.d/
|
||||
|
||||
if [[ -n "$ZOOKEEPER_FAULT_INJECTION" ]] && [[ "$ZOOKEEPER_FAULT_INJECTION" -eq 1 ]]; then
|
||||
ln -sf $SRC_PATH/config.d/zookeeper_fault_injection.xml $DEST_SERVER_PATH/config.d/
|
||||
else
|
||||
ln -sf $SRC_PATH/config.d/zookeeper.xml $DEST_SERVER_PATH/config.d/
|
||||
fi
|
||||
|
||||
# We randomize creating the snapshot on exit for Keeper to test out using older snapshots
|
||||
create_snapshot_on_exit=$(($RANDOM % 2))
|
||||
sed --follow-symlinks -i "s|<create_snapshot_on_exit>true</create_snapshot_on_exit>|<create_snapshot_on_exit>$create_snapshot_on_exit</create_snapshot_on_exit>|" $DEST_SERVER_PATH/config.d/keeper_port.xml
|
||||
|
@ -203,16 +203,11 @@ std::vector<std::shared_ptr<Coordination::ZooKeeper>> Runner::getConnections()
|
||||
Coordination::ZooKeeper::Node node{Poco::Net::SocketAddress{host_string}, false};
|
||||
std::vector<Coordination::ZooKeeper::Node> nodes;
|
||||
nodes.push_back(node);
|
||||
zookeepers.emplace_back(std::make_shared<Coordination::ZooKeeper>(
|
||||
nodes,
|
||||
"", /*chroot*/
|
||||
"", /*identity type*/
|
||||
"", /*identity*/
|
||||
Poco::Timespan(0, 30000 * 1000),
|
||||
Poco::Timespan(0, 1000 * 1000),
|
||||
Poco::Timespan(0, 10000 * 1000),
|
||||
nullptr));
|
||||
|
||||
zkutil::ZooKeeperArgs args;
|
||||
args.session_timeout_ms = 30000;
|
||||
args.connection_timeout_ms = 1000;
|
||||
args.operation_timeout_ms = 10000;
|
||||
zookeepers.emplace_back(std::make_shared<Coordination::ZooKeeper>(nodes, args, nullptr));
|
||||
}
|
||||
|
||||
|
||||
|
@ -69,7 +69,7 @@ int main(int argc, char ** argv)
|
||||
Poco::Logger::root().setChannel(channel);
|
||||
Poco::Logger::root().setLevel("trace");
|
||||
|
||||
zkutil::ZooKeeper zk(argv[1]);
|
||||
zkutil::ZooKeeper zk{zkutil::ZooKeeperArgs(argv[1])};
|
||||
LineReader lr({}, false, {"\\"}, {});
|
||||
|
||||
do
|
||||
|
Loading…
Reference in New Issue
Block a user