refactor ctor arguments

This commit is contained in:
Alexander Tokmakov 2021-10-21 13:44:52 +03:00
parent 49685f0b9a
commit 6e296d0342
12 changed files with 203 additions and 223 deletions

View File

@ -474,15 +474,15 @@ ResponsePtr TestKeeperCheckRequest::createResponse() const { return std::make_sh
ResponsePtr TestKeeperMultiRequest::createResponse() const { return std::make_shared<MultiResponse>(); }
TestKeeper::TestKeeper(const String & root_path_, Poco::Timespan operation_timeout_)
: root_path(root_path_), operation_timeout(operation_timeout_)
TestKeeper::TestKeeper(const zkutil::ZooKeeperArgs & args_)
: args(args_)
{
container.emplace("/", Node());
if (!root_path.empty())
if (!args.chroot.empty())
{
if (root_path.back() == '/')
root_path.pop_back();
if (args.chroot.back() == '/')
args.chroot.pop_back();
}
processing_thread = ThreadFromGlobalPool([this] { processingThread(); });
@ -514,7 +514,7 @@ void TestKeeper::processingThread()
{
RequestInfo info;
UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds());
UInt64 max_wait = UInt64(args.operation_timeout_ms);
if (requests_queue.tryPop(info, max_wait))
{
if (expired)
@ -523,7 +523,7 @@ void TestKeeper::processingThread()
++zxid;
info.request->addRootPath(root_path);
info.request->addRootPath(args.chroot);
auto [response, _] = info.request->process(container, zxid);
if (info.watch)
@ -547,7 +547,7 @@ void TestKeeper::processingThread()
if (response->error == Error::ZOK)
info.request->processWatches(watches, list_watches);
response->removeRootPath(root_path);
response->removeRootPath(args.chroot);
if (info.callback)
info.callback(*response);
}
@ -656,7 +656,7 @@ void TestKeeper::pushRequest(RequestInfo && request)
if (expired)
throw Exception("Session expired", Error::ZSESSIONEXPIRED);
if (!requests_queue.tryPush(std::move(request), operation_timeout.totalMilliseconds()))
if (!requests_queue.tryPush(std::move(request), args.operation_timeout_ms))
throw Exception("Cannot push request to queue within operation timeout", Error::ZOPERATIONTIMEOUT);
}
catch (...)

View File

@ -8,6 +8,7 @@
#include <Poco/Timespan.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperArgs.h>
#include <Common/ThreadPool.h>
#include <Common/ConcurrentBoundedQueue.h>
@ -33,7 +34,7 @@ using TestKeeperRequestPtr = std::shared_ptr<TestKeeperRequest>;
class TestKeeper final : public IKeeper
{
public:
TestKeeper(const String & root_path_, Poco::Timespan operation_timeout_);
TestKeeper(const zkutil::ZooKeeperArgs & args_);
~TestKeeper() override;
bool isExpired() const override { return expired; }
@ -113,10 +114,7 @@ private:
Container container;
String root_path;
ACLs default_acls;
Poco::Timespan operation_timeout;
zkutil::ZooKeeperArgs args;
std::mutex push_request_mutex;
std::atomic<bool> expired{false};

View File

@ -8,7 +8,6 @@
#include <pcg-random/pcg_random.hpp>
#include <base/logger_useful.h>
#include <base/find_symbols.h>
#include <Common/randomSeed.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/Exception.h>
@ -16,8 +15,6 @@
#include <Poco/Net/NetException.h>
#define ZOOKEEPER_CONNECTION_TIMEOUT_MS 1000
namespace fs = std::filesystem;
namespace DB
@ -46,26 +43,20 @@ static void check(Coordination::Error code, const std::string & path)
}
void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_, const std::string & identity_,
int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_)
void ZooKeeper::init(ZooKeeperArgs args_)
{
args = std::move(args_);
log = &Poco::Logger::get("ZooKeeper");
hosts = hosts_;
identity = identity_;
session_timeout_ms = session_timeout_ms_;
operation_timeout_ms = operation_timeout_ms_;
chroot = chroot_;
implementation = implementation_;
if (implementation == "zookeeper")
if (args.implementation == "zookeeper")
{
if (hosts.empty())
if (args.hosts.empty())
throw KeeperException("No hosts passed to ZooKeeper constructor.", Coordination::Error::ZBADARGUMENTS);
Coordination::ZooKeeper::Nodes nodes;
nodes.reserve(hosts.size());
nodes.reserve(args.hosts.size());
Strings shuffled_hosts = hosts;
Strings shuffled_hosts = args.hosts;
/// Shuffle the hosts to distribute the load among ZooKeeper nodes.
pcg64 generator(randomSeed());
std::shuffle(shuffled_hosts.begin(), shuffled_hosts.end(), generator);
@ -104,33 +95,23 @@ void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_
throw KeeperException("Cannot use any of provided ZooKeeper nodes", Coordination::Error::ZBADARGUMENTS);
}
impl = std::make_unique<Coordination::ZooKeeper>(
nodes,
chroot,
identity_.empty() ? "" : "digest",
identity_,
Poco::Timespan(0, session_timeout_ms_ * 1000),
Poco::Timespan(0, ZOOKEEPER_CONNECTION_TIMEOUT_MS * 1000),
Poco::Timespan(0, operation_timeout_ms_ * 1000),
zk_log);
impl = std::make_unique<Coordination::ZooKeeper>(nodes, args, zk_log);
if (chroot.empty())
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(hosts, ","));
if (args.chroot.empty())
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(args.hosts, ","));
else
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(hosts, ","), chroot);
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), chroot);
}
else if (implementation == "testkeeper")
else if (args.implementation == "testkeeper")
{
impl = std::make_unique<Coordination::TestKeeper>(
chroot,
Poco::Timespan(0, operation_timeout_ms_ * 1000));
impl = std::make_unique<Coordination::TestKeeper>(args);
}
else
{
throw DB::Exception("Unknown implementation of coordination service: " + implementation, DB::ErrorCodes::NOT_IMPLEMENTED);
throw DB::Exception("Unknown implementation of coordination service: " + args.implementation, DB::ErrorCodes::NOT_IMPLEMENTED);
}
if (!chroot.empty())
if (!args.chroot.empty())
{
/// Here we check that zk root exists.
/// This check is clumsy. The reason is we do this request under common mutex, and never want to hung here.
@ -140,7 +121,7 @@ void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_
/// This should not happen now, when memory tracker is disabled.
/// But let's keep it just in case (it is also easy to backport).
auto future = asyncExists("/");
auto res = future.wait_for(std::chrono::milliseconds(operation_timeout_ms));
auto res = future.wait_for(std::chrono::milliseconds(args.operation_timeout_ms));
if (res != std::future_status::ready)
throw KeeperException("Cannot check if zookeeper root exists.", Coordination::Error::ZOPERATIONTIMEOUT);
@ -149,107 +130,32 @@ void ZooKeeper::init(const std::string & implementation_, const Strings & hosts_
throw KeeperException(code, "/");
if (code == Coordination::Error::ZNONODE)
throw KeeperException("ZooKeeper root doesn't exist. You should create root node " + chroot + " before start.", Coordination::Error::ZNONODE);
throw KeeperException("ZooKeeper root doesn't exist. You should create root node " + args.chroot + " before start.", Coordination::Error::ZNONODE);
}
}
ZooKeeper::ZooKeeper(const std::string & hosts_string, const std::string & identity_, int32_t session_timeout_ms_,
int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_,
std::shared_ptr<DB::ZooKeeperLog> zk_log_)
ZooKeeper::ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
{
zk_log = std::move(zk_log_);
Strings hosts_strings;
splitInto<','>(hosts_strings, hosts_string);
init(implementation_, hosts_strings, identity_, session_timeout_ms_, operation_timeout_ms_, chroot_);
init(args_);
}
ZooKeeper::ZooKeeper(const Strings & hosts_, const std::string & identity_, int32_t session_timeout_ms_,
int32_t operation_timeout_ms_, const std::string & chroot_, const std::string & implementation_,
std::shared_ptr<DB::ZooKeeperLog> zk_log_)
{
zk_log = std::move(zk_log_);
init(implementation_, hosts_, identity_, session_timeout_ms_, operation_timeout_ms_, chroot_);
}
struct ZooKeeperArgs
{
ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_name, keys);
session_timeout_ms = Coordination::DEFAULT_SESSION_TIMEOUT_MS;
operation_timeout_ms = Coordination::DEFAULT_OPERATION_TIMEOUT_MS;
implementation = "zookeeper";
for (const auto & key : keys)
{
if (startsWith(key, "node"))
{
hosts.push_back(
(config.getBool(config_name + "." + key + ".secure", false) ? "secure://" : "") +
config.getString(config_name + "." + key + ".host") + ":"
+ config.getString(config_name + "." + key + ".port", "2181")
);
}
else if (key == "session_timeout_ms")
{
session_timeout_ms = config.getInt(config_name + "." + key);
}
else if (key == "operation_timeout_ms")
{
operation_timeout_ms = config.getInt(config_name + "." + key);
}
else if (key == "identity")
{
identity = config.getString(config_name + "." + key);
}
else if (key == "root")
{
chroot = config.getString(config_name + "." + key);
}
else if (key == "implementation")
{
implementation = config.getString(config_name + "." + key);
}
else
throw KeeperException(std::string("Unknown key ") + key + " in config file", Coordination::Error::ZBADARGUMENTS);
}
if (!chroot.empty())
{
if (chroot.front() != '/')
throw KeeperException(std::string("Root path in config file should start with '/', but got ") + chroot, Coordination::Error::ZBADARGUMENTS);
if (chroot.back() == '/')
chroot.pop_back();
}
}
Strings hosts;
std::string identity;
int session_timeout_ms;
int operation_timeout_ms;
std::string chroot;
std::string implementation;
};
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name, std::shared_ptr<DB::ZooKeeperLog> zk_log_)
: zk_log(std::move(zk_log_))
{
ZooKeeperArgs args(config, config_name);
init(args.implementation, args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot);
init(ZooKeeperArgs(config, config_name));
}
bool ZooKeeper::configChanged(const Poco::Util::AbstractConfiguration & config, const std::string & config_name) const
{
ZooKeeperArgs args(config, config_name);
ZooKeeperArgs new_args(config, config_name);
// skip reload testkeeper cause it's for test and data in memory
if (args.implementation == implementation && implementation == "testkeeper")
if (new_args.implementation == args.implementation && args.implementation == "testkeeper")
return false;
return std::tie(args.implementation, args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot)
!= std::tie(implementation, hosts, identity, session_timeout_ms, operation_timeout_ms, chroot);
return args != new_args;
}
@ -267,7 +173,7 @@ Coordination::Error ZooKeeper::getChildrenImpl(const std::string & path, Strings
{
auto future_result = asyncTryGetChildrenNoThrow(path, watch_callback);
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::List), path));
return Coordination::Error::ZOPERATIONTIMEOUT;
@ -328,7 +234,7 @@ Coordination::Error ZooKeeper::createImpl(const std::string & path, const std::s
{
auto future_result = asyncTryCreateNoThrow(path, data, mode);
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Create), path));
return Coordination::Error::ZOPERATIONTIMEOUT;
@ -398,7 +304,7 @@ Coordination::Error ZooKeeper::removeImpl(const std::string & path, int32_t vers
auto future_result = asyncTryRemoveNoThrow(path, version);
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Remove), path));
return Coordination::Error::ZOPERATIONTIMEOUT;
@ -430,7 +336,7 @@ Coordination::Error ZooKeeper::existsImpl(const std::string & path, Coordination
{
auto future_result = asyncTryExistsNoThrow(path, watch_callback);
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Exists), path));
return Coordination::Error::ZOPERATIONTIMEOUT;
@ -464,7 +370,7 @@ Coordination::Error ZooKeeper::getImpl(const std::string & path, std::string & r
{
auto future_result = asyncTryGetNoThrow(path, watch_callback);
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Get), path));
return Coordination::Error::ZOPERATIONTIMEOUT;
@ -537,7 +443,7 @@ Coordination::Error ZooKeeper::setImpl(const std::string & path, const std::stri
{
auto future_result = asyncTrySetNoThrow(path, data, version);
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Set), path));
return Coordination::Error::ZOPERATIONTIMEOUT;
@ -589,7 +495,7 @@ Coordination::Error ZooKeeper::multiImpl(const Coordination::Requests & requests
auto future_result = asyncTryMultiNoThrow(requests);
if (future_result.wait_for(std::chrono::milliseconds(operation_timeout_ms)) != std::future_status::ready)
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Multi), requests[0]->getPath()));
return Coordination::Error::ZOPERATIONTIMEOUT;
@ -752,7 +658,7 @@ bool ZooKeeper::waitForDisappear(const std::string & path, const WaitCondition &
ZooKeeperPtr ZooKeeper::startNewSession() const
{
return std::make_shared<ZooKeeper>(hosts, identity, session_timeout_ms, operation_timeout_ms, chroot, implementation, zk_log);
return std::make_shared<ZooKeeper>(args, zk_log);
}

View File

@ -13,6 +13,7 @@
#include <Common/Stopwatch.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Common/ZooKeeper/ZooKeeperArgs.h>
#include <unistd.h>
@ -50,22 +51,10 @@ constexpr size_t MULTI_BATCH_SIZE = 100;
class ZooKeeper
{
public:
using Ptr = std::shared_ptr<ZooKeeper>;
/// hosts_string -- comma separated [secure://]host:port list
ZooKeeper(const std::string & hosts_string, const std::string & identity_ = "",
int32_t session_timeout_ms_ = Coordination::DEFAULT_SESSION_TIMEOUT_MS,
int32_t operation_timeout_ms_ = Coordination::DEFAULT_OPERATION_TIMEOUT_MS,
const std::string & chroot_ = "",
const std::string & implementation_ = "zookeeper",
std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr);
ZooKeeper(const Strings & hosts_, const std::string & identity_ = "",
int32_t session_timeout_ms_ = Coordination::DEFAULT_SESSION_TIMEOUT_MS,
int32_t operation_timeout_ms_ = Coordination::DEFAULT_OPERATION_TIMEOUT_MS,
const std::string & chroot_ = "",
const std::string & implementation_ = "zookeeper",
std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr);
ZooKeeper(const ZooKeeperArgs & args_, std::shared_ptr<DB::ZooKeeperLog> zk_log_ = nullptr);
/** Config of the form:
<zookeeper>
@ -283,8 +272,7 @@ public:
private:
friend class EphemeralNodeHolder;
void init(const std::string & implementation_, const Strings & hosts_, const std::string & identity_,
int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_);
void init(ZooKeeperArgs args_);
/// The following methods don't any throw exceptions but return error codes.
Coordination::Error createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created);
@ -299,12 +287,7 @@ private:
std::unique_ptr<Coordination::IKeeper> impl;
Strings hosts;
std::string identity;
int32_t session_timeout_ms;
int32_t operation_timeout_ms;
std::string chroot;
std::string implementation;
ZooKeeperArgs args;
std::mutex mutex;

View File

@ -0,0 +1,71 @@
#include <Common/ZooKeeper/ZooKeeperArgs.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <base/find_symbols.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace zkutil
{
ZooKeeperArgs::ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, const String & config_name)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_name, keys);
for (const auto & key : keys)
{
if (key.starts_with("node"))
{
hosts.push_back(
(config.getBool(config_name + "." + key + ".secure", false) ? "secure://" : "")
+ config.getString(config_name + "." + key + ".host") + ":" + config.getString(config_name + "." + key + ".port", "2181"));
}
else if (key == "session_timeout_ms")
{
session_timeout_ms = config.getInt(config_name + "." + key);
}
else if (key == "operation_timeout_ms")
{
operation_timeout_ms = config.getInt(config_name + "." + key);
}
else if (key == "connection_timeout_ms")
{
connection_timeout_ms = config.getInt(config_name + "." + key);
}
else if (key == "session_fault_probability")
{
session_fault_probability = config.getDouble(config_name + "." + key);
}
else if (key == "identity")
{
identity = config.getString(config_name + "." + key);
if (!identity.empty())
auth_scheme = "digest";
}
else if (key == "root")
{
chroot = config.getString(config_name + "." + key);
}
else if (key == "implementation")
{
implementation = config.getString(config_name + "." + key);
}
else
throw KeeperException(std::string("Unknown key ") + key + " in config file", Coordination::Error::ZBADARGUMENTS);
}
if (!chroot.empty())
{
if (chroot.front() != '/')
throw KeeperException(
std::string("Root path in config file should start with '/', but got ") + chroot, Coordination::Error::ZBADARGUMENTS);
if (chroot.back() == '/')
chroot.pop_back();
}
}
ZooKeeperArgs::ZooKeeperArgs(const String & hosts_string)
{
splitInto<','>(hosts, hosts_string);
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
}
namespace zkutil
{
struct ZooKeeperArgs
{
ZooKeeperArgs(const Poco::Util::AbstractConfiguration & config, const String & config_name);
/// hosts_string -- comma separated [secure://]host:port list
ZooKeeperArgs(const String & hosts_string);
ZooKeeperArgs() = default;
bool operator == (const ZooKeeperArgs &) const = default;
bool operator != (const ZooKeeperArgs &) const = default;
String implementation;
Strings hosts;
String auth_scheme;
String identity;
String chroot;
int32_t connection_timeout_ms = Coordination::DEFAULT_CONNECTION_TIMEOUT_MS;
int32_t session_timeout_ms = Coordination::DEFAULT_SESSION_TIMEOUT_MS;
int32_t operation_timeout_ms = Coordination::DEFAULT_OPERATION_TIMEOUT_MS;
float session_fault_probability = 0;
};
}

View File

@ -49,5 +49,6 @@ static constexpr int32_t PASSWORD_LENGTH = 16;
static constexpr int32_t MAX_STRING_OR_ARRAY_SIZE = 1 << 28; /// 256 MiB
static constexpr int32_t DEFAULT_SESSION_TIMEOUT_MS = 30000;
static constexpr int32_t DEFAULT_OPERATION_TIMEOUT_MS = 10000;
static constexpr int32_t DEFAULT_CONNECTION_TIMEOUT_MS = 1000;
}

View File

@ -274,15 +274,15 @@ void ZooKeeper::read(T & x)
Coordination::read(x, *in);
}
static void removeRootPath(String & path, const String & root_path)
static void removeRootPath(String & path, const String & chroot)
{
if (root_path.empty())
if (chroot.empty())
return;
if (path.size() <= root_path.size())
throw Exception("Received path is not longer than root_path", Error::ZDATAINCONSISTENCY);
if (path.size() <= chroot.size())
throw Exception("Received path is not longer than chroot", Error::ZDATAINCONSISTENCY);
path = path.substr(root_path.size());
path = path.substr(chroot.size());
}
ZooKeeper::~ZooKeeper()
@ -306,27 +306,20 @@ ZooKeeper::~ZooKeeper()
ZooKeeper::ZooKeeper(
const Nodes & nodes,
const String & root_path_,
const String & auth_scheme,
const String & auth_data,
Poco::Timespan session_timeout_,
Poco::Timespan connection_timeout,
Poco::Timespan operation_timeout_,
const zkutil::ZooKeeperArgs & args_,
std::shared_ptr<ZooKeeperLog> zk_log_)
: root_path(root_path_),
session_timeout(session_timeout_),
operation_timeout(std::min(operation_timeout_, session_timeout_))
: args(args_)
{
log = &Poco::Logger::get("ZooKeeperClient");
std::atomic_store(&zk_log, std::move(zk_log_));
if (!root_path.empty())
if (!args.chroot.empty())
{
if (root_path.back() == '/')
root_path.pop_back();
if (args.chroot.back() == '/')
args.chroot.pop_back();
}
if (auth_scheme.empty())
if (args.auth_scheme.empty())
{
ACL acl;
acl.permissions = ACL::All;
@ -343,10 +336,10 @@ ZooKeeper::ZooKeeper(
default_acls.emplace_back(std::move(acl));
}
connect(nodes, connection_timeout);
connect(nodes, args.connection_timeout_ms * 1000);
if (!auth_scheme.empty())
sendAuth(auth_scheme, auth_data);
if (!args.auth_scheme.empty())
sendAuth(args.auth_scheme, args.identity);
send_thread = ThreadFromGlobalPool([this] { sendThread(); });
receive_thread = ThreadFromGlobalPool([this] { receiveThread(); });
@ -390,8 +383,8 @@ void ZooKeeper::connect(
socket.connect(node.address, connection_timeout);
socket_address = socket.peerAddress();
socket.setReceiveTimeout(operation_timeout);
socket.setSendTimeout(operation_timeout);
socket.setReceiveTimeout(args.operation_timeout_ms);
socket.setSendTimeout(args.operation_timeout_ms);
socket.setNoDelay(true);
in.emplace(socket);
@ -462,7 +455,7 @@ void ZooKeeper::sendHandshake()
{
int32_t handshake_length = 44;
int64_t last_zxid_seen = 0;
int32_t timeout = session_timeout.totalMilliseconds();
int32_t timeout = args.session_timeout_ms;
int64_t previous_session_id = 0; /// We don't support session restore. So previous session_id is always zero.
constexpr int32_t passwd_len = 16;
std::array<char, passwd_len> passwd {};
@ -494,9 +487,9 @@ void ZooKeeper::receiveHandshake()
throw Exception("Unexpected protocol version: " + DB::toString(protocol_version_read), Error::ZMARSHALLINGERROR);
read(timeout);
if (timeout != session_timeout.totalMilliseconds())
if (timeout != args.session_timeout_ms)
/// Use timeout from server.
session_timeout = timeout * Poco::Timespan::MILLISECONDS;
args.session_timeout_ms = timeout * 1000;
read(session_id);
read(passwd);
@ -550,14 +543,14 @@ void ZooKeeper::sendThread()
auto prev_bytes_sent = out->count();
auto now = clock::now();
auto next_heartbeat_time = prev_heartbeat_time + std::chrono::milliseconds(session_timeout.totalMilliseconds() / 3);
auto next_heartbeat_time = prev_heartbeat_time + std::chrono::milliseconds(args.session_timeout_ms / 3);
if (next_heartbeat_time > now)
{
/// Wait for the next request in queue. No more than operation timeout. No more than until next heartbeat time.
UInt64 max_wait = std::min(
UInt64(std::chrono::duration_cast<std::chrono::milliseconds>(next_heartbeat_time - now).count()),
UInt64(operation_timeout.totalMilliseconds()));
UInt64(args.operation_timeout_ms));
RequestInfo info;
if (requests_queue.tryPop(info, max_wait))
@ -582,7 +575,7 @@ void ZooKeeper::sendThread()
break;
}
info.request->addRootPath(root_path);
info.request->addRootPath(args.chroot);
info.request->probably_sent = true;
info.request->write(*out);
@ -621,13 +614,13 @@ void ZooKeeper::receiveThread()
try
{
Int64 waited = 0;
Int64 waited_us = 0;
while (!requests_queue.isFinished())
{
auto prev_bytes_received = in->count();
clock::time_point now = clock::now();
UInt64 max_wait = operation_timeout.totalMicroseconds();
UInt64 max_wait_us = args.operation_timeout_ms;
std::optional<RequestInfo> earliest_operation;
{
@ -636,20 +629,20 @@ void ZooKeeper::receiveThread()
{
/// Operations are ordered by xid (and consequently, by time).
earliest_operation = operations.begin()->second;
auto earliest_operation_deadline = earliest_operation->time + std::chrono::microseconds(operation_timeout.totalMicroseconds());
auto earliest_operation_deadline = earliest_operation->time + std::chrono::microseconds(args.operation_timeout_ms * 1000);
if (now > earliest_operation_deadline)
throw Exception("Operation timeout (deadline already expired) for path: " + earliest_operation->request->getPath(), Error::ZOPERATIONTIMEOUT);
max_wait = std::chrono::duration_cast<std::chrono::microseconds>(earliest_operation_deadline - now).count();
max_wait_us = std::chrono::duration_cast<std::chrono::microseconds>(earliest_operation_deadline - now).count();
}
}
if (in->poll(max_wait))
if (in->poll(max_wait_us))
{
if (requests_queue.isFinished())
break;
receiveEvent();
waited = 0;
waited_us = 0;
}
else
{
@ -657,8 +650,8 @@ void ZooKeeper::receiveThread()
{
throw Exception("Operation timeout (no response) for request " + toString(earliest_operation->request->getOpNum()) + " for path: " + earliest_operation->request->getPath(), Error::ZOPERATIONTIMEOUT);
}
waited += max_wait;
if (waited >= session_timeout.totalMicroseconds())
waited_us += max_wait_us;
if (waited_us >= args.session_timeout_ms * 1000)
throw Exception("Nothing is received in session timeout", Error::ZOPERATIONTIMEOUT);
}
@ -768,7 +761,7 @@ void ZooKeeper::receiveEvent()
else
{
response->readImpl(*in);
response->removeRootPath(root_path);
response->removeRootPath(args.chroot);
}
/// Instead of setting the watch in sendEvent, set it in receiveEvent because need to check the response.
/// The watch shouldn't be set if the node does not exist and it will never exist like sequential ephemeral nodes.
@ -788,9 +781,9 @@ void ZooKeeper::receiveEvent()
{
CurrentMetrics::add(CurrentMetrics::ZooKeeperWatch);
/// The key of wathces should exclude the root_path
/// The key of wathces should exclude the args.chroot
String req_path = request_info.request->getPath();
removeRootPath(req_path, root_path);
removeRootPath(req_path, args.chroot);
std::lock_guard lock(watches_mutex);
watches[req_path].emplace_back(std::move(request_info.watch));
}
@ -1026,7 +1019,7 @@ void ZooKeeper::pushRequest(RequestInfo && info)
}
}
if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds()))
if (!requests_queue.tryPush(std::move(info), args.operation_timeout_ms))
{
if (requests_queue.isFinished())
throw Exception("Session expired", Error::ZSESSIONEXPIRED);
@ -1211,7 +1204,7 @@ void ZooKeeper::close()
RequestInfo request_info;
request_info.request = std::make_shared<ZooKeeperCloseRequest>(std::move(request));
if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
if (!requests_queue.tryPush(std::move(request_info), args.operation_timeout_ms))
throw Exception("Cannot push close request to queue within operation timeout", Error::ZOPERATIONTIMEOUT);
ProfileEvents::increment(ProfileEvents::ZooKeeperClose);

View File

@ -6,6 +6,7 @@
#include <Common/ThreadPool.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperArgs.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
@ -109,12 +110,7 @@ public:
*/
ZooKeeper(
const Nodes & nodes,
const String & root_path,
const String & auth_scheme,
const String & auth_data,
Poco::Timespan session_timeout_,
Poco::Timespan connection_timeout,
Poco::Timespan operation_timeout_,
const zkutil::ZooKeeperArgs & args_,
std::shared_ptr<ZooKeeperLog> zk_log_);
~ZooKeeper() override;
@ -192,11 +188,10 @@ public:
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);
private:
String root_path;
ACLs default_acls;
Poco::Timespan session_timeout;
Poco::Timespan operation_timeout;
zkutil::ZooKeeperArgs args;
Poco::Net::StreamSocket socket;
/// To avoid excessive getpeername(2) calls.

View File

@ -174,15 +174,11 @@ std::vector<std::shared_ptr<Coordination::ZooKeeper>> Runner::getConnections()
Coordination::ZooKeeper::Node node{Poco::Net::SocketAddress{host_string}, false};
std::vector<Coordination::ZooKeeper::Node> nodes;
nodes.push_back(node);
zookeepers.emplace_back(std::make_shared<Coordination::ZooKeeper>(
nodes,
"", /*chroot*/
"", /*identity type*/
"", /*identity*/
Poco::Timespan(0, 30000 * 1000),
Poco::Timespan(0, 1000 * 1000),
Poco::Timespan(0, 10000 * 1000),
nullptr));
zkutil::ZooKeeperArgs args;
args.session_timeout_ms = 30000;
args.connection_timeout_ms = 1000;
args.operation_timeout_ms = 10000;
zookeepers.emplace_back(std::make_shared<Coordination::ZooKeeper>(nodes, args, nullptr));
}
return zookeepers;

View File

@ -69,7 +69,7 @@ int main(int argc, char ** argv)
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace");
zkutil::ZooKeeper zk(argv[1]);
zkutil::ZooKeeper zk{zkutil::ZooKeeperArgs(argv[1])};
LineReader lr({}, false, {"\\"}, {});
do

View File

@ -334,7 +334,7 @@ int main(int argc, char *argv[])
Poco::Logger::root().setChannel(channel);
Poco::Logger::root().setLevel("trace");
zkutil::ZooKeeper zk(argv[1]);
zkutil::ZooKeeper zk{zkutil::ZooKeeperArgs(argv[1])};
try
{