Merge pull request #5643 from yandex/levysh-testkeeper

Cleanup for TestKeeper
This commit is contained in:
alexey-milovidov 2019-06-16 12:10:22 +03:00 committed by GitHub
commit debc9366f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 961 additions and 84 deletions

View File

@ -1,8 +1,5 @@
<yandex>
<!-- <zookeeper>
<node>
<host>localhost</host>
<port>2181</port>
</node>
</zookeeper>-->
<zookeeper>
<implementation>testkeeper</implementation>
</zookeeper>
</yandex>

View File

@ -19,6 +19,7 @@ namespace ErrorCodes
extern const int STD_EXCEPTION;
extern const int UNKNOWN_EXCEPTION;
extern const int CANNOT_TRUNCATE_FILE;
extern const int NOT_IMPLEMENTED;
}
std::string errnoToString(int code, int e)

View File

@ -0,0 +1,727 @@
#include <Common/ZooKeeper/TestKeeper.h>
#include <Common/setThreadName.h>
#include <Common/StringUtils/StringUtils.h>
#include <Core/Types.h>
#include <sstream>
#include <iomanip>
namespace Coordination
{
static String parentPath(const String & path)
{
auto rslash_pos = path.rfind('/');
if (rslash_pos > 0)
return path.substr(0, rslash_pos);
return "/";
}
static String baseName(const String & path)
{
auto rslash_pos = path.rfind('/');
return path.substr(rslash_pos + 1);
}
struct TestKeeperRequest : virtual Request
{
virtual bool isMutable() const { return false; }
virtual ResponsePtr createResponse() const = 0;
virtual ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const = 0;
virtual void processWatches(TestKeeper::Watches & /*watches*/, TestKeeper::Watches & /*list_watches*/) const {}
};
static void processWatchesImpl(const String & path, TestKeeper::Watches & watches, TestKeeper::Watches & list_watches)
{
WatchResponse watch_response;
watch_response.path = path;
auto it = watches.find(watch_response.path);
if (it != watches.end())
{
for (auto & callback : it->second)
if (callback)
callback(watch_response);
watches.erase(it);
}
WatchResponse watch_list_response;
watch_list_response.path = parentPath(path);
it = list_watches.find(watch_list_response.path);
if (it != list_watches.end())
{
for (auto & callback : it->second)
if (callback)
callback(watch_list_response);
list_watches.erase(it);
}
}
struct TestKeeperCreateRequest final : CreateRequest, TestKeeperRequest
{
TestKeeperCreateRequest() {}
TestKeeperCreateRequest(const CreateRequest & base) : CreateRequest(base) {}
ResponsePtr createResponse() const override;
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
{
processWatchesImpl(getPath(), node_watches, list_watches);
}
};
struct TestKeeperRemoveRequest final : RemoveRequest, TestKeeperRequest
{
TestKeeperRemoveRequest() {}
TestKeeperRemoveRequest(const RemoveRequest & base) : RemoveRequest(base) {}
bool isMutable() const override { return true; }
ResponsePtr createResponse() const override;
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
{
processWatchesImpl(getPath(), node_watches, list_watches);
}
};
struct TestKeeperExistsRequest final : ExistsRequest, TestKeeperRequest
{
ResponsePtr createResponse() const override;
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
};
struct TestKeeperGetRequest final : GetRequest, TestKeeperRequest
{
TestKeeperGetRequest() {}
ResponsePtr createResponse() const override;
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
};
struct TestKeeperSetRequest final : SetRequest, TestKeeperRequest
{
TestKeeperSetRequest() {}
TestKeeperSetRequest(const SetRequest & base) : SetRequest(base) {}
bool isMutable() const override { return true; }
ResponsePtr createResponse() const override;
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
{
processWatchesImpl(getPath(), node_watches, list_watches);
}
};
struct TestKeeperListRequest final : ListRequest, TestKeeperRequest
{
ResponsePtr createResponse() const override;
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
};
struct TestKeeperCheckRequest final : CheckRequest, TestKeeperRequest
{
TestKeeperCheckRequest() {}
TestKeeperCheckRequest(const CheckRequest & base) : CheckRequest(base) {}
ResponsePtr createResponse() const override;
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
};
struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
{
TestKeeperMultiRequest(const Requests & generic_requests)
{
requests.reserve(generic_requests.size());
for (const auto & generic_request : generic_requests)
{
if (auto * concrete_request_create = dynamic_cast<const CreateRequest *>(generic_request.get()))
{
auto create = std::make_shared<TestKeeperCreateRequest>(*concrete_request_create);
requests.push_back(create);
}
else if (auto * concrete_request_remove = dynamic_cast<const RemoveRequest *>(generic_request.get()))
{
requests.push_back(std::make_shared<TestKeeperRemoveRequest>(*concrete_request_remove));
}
else if (auto * concrete_request_set = dynamic_cast<const SetRequest *>(generic_request.get()))
{
requests.push_back(std::make_shared<TestKeeperSetRequest>(*concrete_request_set));
}
else if (auto * concrete_request_check = dynamic_cast<const CheckRequest *>(generic_request.get()))
{
requests.push_back(std::make_shared<TestKeeperCheckRequest>(*concrete_request_check));
}
else
throw Exception("Illegal command as part of multi ZooKeeper request", ZBADARGUMENTS);
}
}
void processWatches(TestKeeper::Watches & node_watches, TestKeeper::Watches & list_watches) const override
{
for (const auto & generic_request : requests)
dynamic_cast<const TestKeeperRequest &>(*generic_request).processWatches(node_watches, list_watches);
}
ResponsePtr createResponse() const override;
ResponsePtr process(TestKeeper::Container & container, int64_t zxid) const override;
};
ResponsePtr TestKeeperCreateRequest::process(TestKeeper::Container & container, int64_t zxid) const
{
CreateResponse response;
if (container.count(path))
{
response.error = Error::ZNODEEXISTS;
}
else
{
auto it = container.find(parentPath(path));
if (it == container.end())
{
response.error = Error::ZNONODE;
}
else if (it->second.is_ephemeral)
{
response.error = Error::ZNOCHILDRENFOREPHEMERALS;
}
else
{
TestKeeper::Node created_node;
created_node.seq_num = 0;
created_node.stat.czxid = zxid;
created_node.stat.mzxid = zxid;
created_node.stat.ctime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
created_node.stat.mtime = created_node.stat.ctime;
created_node.stat.numChildren = 0;
created_node.stat.dataLength = data.length();
created_node.data = data;
created_node.is_ephemeral = is_ephemeral;
created_node.is_sequental = is_sequential;
std::string path_created = path;
if (is_sequential)
{
auto seq_num = it->second.seq_num;
++it->second.seq_num;
std::stringstream seq_num_str;
seq_num_str << std::setw(10) << std::setfill('0') << seq_num;
path_created += seq_num_str.str();
}
response.path_created = path_created;
container.emplace(std::move(path_created), std::move(created_node));
++it->second.stat.cversion;
++it->second.stat.numChildren;
response.error = Error::ZOK;
}
}
return std::make_shared<CreateResponse>(response);
}
ResponsePtr TestKeeperRemoveRequest::process(TestKeeper::Container & container, int64_t) const
{
RemoveResponse response;
auto it = container.find(path);
if (it == container.end())
{
response.error = Error::ZNONODE;
}
else if (version != -1 && version != it->second.stat.version)
{
response.error = Error::ZBADVERSION;
}
else if (it->second.stat.numChildren)
{
response.error = Error::ZNOTEMPTY;
}
else
{
container.erase(it);
auto & parent = container.at(parentPath(path));
--parent.stat.numChildren;
++parent.stat.cversion;
response.error = Error::ZOK;
}
return std::make_shared<RemoveResponse>(response);
}
ResponsePtr TestKeeperExistsRequest::process(TestKeeper::Container & container, int64_t) const
{
ExistsResponse response;
auto it = container.find(path);
if (it != container.end())
{
response.stat = it->second.stat;
response.error = Error::ZOK;
}
else
{
response.error = Error::ZNONODE;
}
return std::make_shared<ExistsResponse>(response);
}
ResponsePtr TestKeeperGetRequest::process(TestKeeper::Container & container, int64_t) const
{
GetResponse response;
auto it = container.find(path);
if (it == container.end())
{
response.error = Error::ZNONODE;
}
else
{
response.stat = it->second.stat;
response.data = it->second.data;
response.error = Error::ZOK;
}
return std::make_shared<GetResponse>(response);
}
ResponsePtr TestKeeperSetRequest::process(TestKeeper::Container & container, int64_t zxid) const
{
SetResponse response;
auto it = container.find(path);
if (it == container.end())
{
response.error = Error::ZNONODE;
}
else if (version == -1 || version == it->second.stat.version)
{
it->second.data = data;
++it->second.stat.version;
it->second.stat.mzxid = zxid;
it->second.stat.mtime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
it->second.data = data;
++container.at(parentPath(path)).stat.cversion;
response.stat = it->second.stat;
response.error = Error::ZOK;
}
else
{
response.error = Error::ZBADVERSION;
}
return std::make_shared<SetResponse>(response);
}
ResponsePtr TestKeeperListRequest::process(TestKeeper::Container & container, int64_t) const
{
ListResponse response;
auto it = container.find(path);
if (it == container.end())
{
response.error = Error::ZNONODE;
}
else
{
auto path_prefix = path;
if (path_prefix.empty())
throw Exception("Logical error: path cannot be empty", ZSESSIONEXPIRED);
if (path_prefix.back() != '/')
path_prefix += '/';
/// Fairly inefficient.
for (auto child_it = container.upper_bound(path_prefix); child_it != container.end() && startsWith(child_it->first, path_prefix); ++child_it)
if (parentPath(child_it->first) == path)
response.names.emplace_back(baseName(child_it->first));
response.stat = it->second.stat;
response.error = Error::ZOK;
}
return std::make_shared<ListResponse>(response);
}
ResponsePtr TestKeeperCheckRequest::process(TestKeeper::Container & container, int64_t) const
{
CheckResponse response;
auto it = container.find(path);
if (it == container.end())
{
response.error = Error::ZNONODE;
}
else if (version != -1 && version != it->second.stat.version)
{
response.error = Error::ZBADVERSION;
}
else
{
response.error = Error::ZOK;
}
return std::make_shared<CheckResponse>(response);
}
ResponsePtr TestKeeperMultiRequest::process(TestKeeper::Container & container, int64_t zxid) const
{
MultiResponse response;
response.responses.reserve(requests.size());
/// Fairly inefficient.
auto container_copy = container;
try
{
for (const auto & request : requests)
{
const TestKeeperRequest & concrete_request = dynamic_cast<const TestKeeperRequest &>(*request);
auto cur_response = concrete_request.process(container, zxid);
response.responses.emplace_back(cur_response);
if (cur_response->error != Error::ZOK)
{
response.error = cur_response->error;
container = container_copy;
return std::make_shared<MultiResponse>(response);
}
}
response.error = Error::ZOK;
return std::make_shared<MultiResponse>(response);
}
catch (...)
{
container = container_copy;
throw;
}
}
ResponsePtr TestKeeperCreateRequest::createResponse() const { return std::make_shared<CreateResponse>(); }
ResponsePtr TestKeeperRemoveRequest::createResponse() const { return std::make_shared<RemoveResponse>(); }
ResponsePtr TestKeeperExistsRequest::createResponse() const { return std::make_shared<ExistsResponse>(); }
ResponsePtr TestKeeperGetRequest::createResponse() const { return std::make_shared<GetResponse>(); }
ResponsePtr TestKeeperSetRequest::createResponse() const { return std::make_shared<SetResponse>(); }
ResponsePtr TestKeeperListRequest::createResponse() const { return std::make_shared<ListResponse>(); }
ResponsePtr TestKeeperCheckRequest::createResponse() const { return std::make_shared<CheckResponse>(); }
ResponsePtr TestKeeperMultiRequest::createResponse() const { return std::make_shared<MultiResponse>(); }
TestKeeper::TestKeeper(const String & root_path_, Poco::Timespan operation_timeout)
: root_path(root_path_), operation_timeout(operation_timeout)
{
container.emplace("/", Node());
if (!root_path.empty())
{
if (root_path.back() == '/')
root_path.pop_back();
}
processing_thread = ThreadFromGlobalPool([this] { processingThread(); });
}
TestKeeper::~TestKeeper()
{
try
{
finalize();
if (processing_thread.joinable())
processing_thread.join();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void TestKeeper::processingThread()
{
setThreadName("TestKeeperProc");
try
{
while (!expired)
{
RequestInfo info;
UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds());
if (requests_queue.tryPop(info, max_wait))
{
if (expired)
break;
if (info.watch)
{
auto & watches_type = dynamic_cast<const ListRequest *>(info.request.get())
? list_watches
: watches;
watches_type[info.request->getPath()].emplace_back(std::move(info.watch));
}
++zxid;
info.request->addRootPath(root_path);
ResponsePtr response = info.request->process(container, zxid);
if (response->error == Error::ZOK)
info.request->processWatches(watches, list_watches);
response->removeRootPath(root_path);
if (info.callback)
info.callback(*response);
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
finalize();
}
}
void TestKeeper::finalize()
{
{
std::lock_guard lock(push_request_mutex);
if (expired)
return;
expired = true;
}
processing_thread.join();
try
{
{
for (auto & path_watch : watches)
{
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED;
for (auto & callback : path_watch.second)
{
if (callback)
{
try
{
callback(response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
}
watches.clear();
}
RequestInfo info;
while (requests_queue.tryPop(info))
{
if (info.callback)
{
ResponsePtr response = info.request->createResponse();
response->error = ZSESSIONEXPIRED;
try
{
info.callback(*response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
if (info.watch)
{
WatchResponse response;
response.type = SESSION;
response.state = EXPIRED_SESSION;
response.error = ZSESSIONEXPIRED;
try
{
info.watch(response);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void TestKeeper::pushRequest(RequestInfo && info)
{
try
{
info.time = clock::now();
/// We must serialize 'pushRequest' and 'finalize' (from processingThread) calls
/// to avoid forgotten operations in the queue when session is expired.
/// Invariant: when expired, no new operations will be pushed to the queue in 'pushRequest'
/// and the queue will be drained in 'finalize'.
std::lock_guard lock(push_request_mutex);
if (expired)
throw Exception("Session expired", ZSESSIONEXPIRED);
if (!requests_queue.tryPush(std::move(info), operation_timeout.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ZOPERATIONTIMEOUT);
}
catch (...)
{
finalize();
throw;
}
}
void TestKeeper::create(
const String & path,
const String & data,
bool is_ephemeral,
bool is_sequential,
const ACLs &,
CreateCallback callback)
{
TestKeeperCreateRequest request;
request.path = path;
request.data = data;
request.is_ephemeral = is_ephemeral;
request.is_sequential = is_sequential;
RequestInfo request_info;
request_info.request = std::make_shared<TestKeeperCreateRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const CreateResponse &>(response)); };
pushRequest(std::move(request_info));
}
void TestKeeper::remove(
const String & path,
int32_t version,
RemoveCallback callback)
{
TestKeeperRemoveRequest request;
request.path = path;
request.version = version;
RequestInfo request_info;
request_info.request = std::make_shared<TestKeeperRemoveRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const RemoveResponse &>(response)); };
pushRequest(std::move(request_info));
}
void TestKeeper::exists(
const String & path,
ExistsCallback callback,
WatchCallback watch)
{
TestKeeperExistsRequest request;
request.path = path;
RequestInfo request_info;
request_info.request = std::make_shared<TestKeeperExistsRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const ExistsResponse &>(response)); };
request_info.watch = watch;
pushRequest(std::move(request_info));
}
void TestKeeper::get(
const String & path,
GetCallback callback,
WatchCallback watch)
{
TestKeeperGetRequest request;
request.path = path;
RequestInfo request_info;
request_info.request = std::make_shared<TestKeeperGetRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const GetResponse &>(response)); };
request_info.watch = watch;
pushRequest(std::move(request_info));
}
void TestKeeper::set(
const String & path,
const String & data,
int32_t version,
SetCallback callback)
{
TestKeeperSetRequest request;
request.path = path;
request.data = data;
request.version = version;
RequestInfo request_info;
request_info.request = std::make_shared<TestKeeperSetRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const SetResponse &>(response)); };
pushRequest(std::move(request_info));
}
void TestKeeper::list(
const String & path,
ListCallback callback,
WatchCallback watch)
{
TestKeeperListRequest request;
request.path = path;
RequestInfo request_info;
request_info.request = std::make_shared<TestKeeperListRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const ListResponse &>(response)); };
request_info.watch = watch;
pushRequest(std::move(request_info));
}
void TestKeeper::check(
const String & path,
int32_t version,
CheckCallback callback)
{
TestKeeperCheckRequest request;
request.path = path;
request.version = version;
RequestInfo request_info;
request_info.request = std::make_shared<TestKeeperCheckRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const CheckResponse &>(response)); };
pushRequest(std::move(request_info));
}
void TestKeeper::multi(
const Requests & requests,
MultiCallback callback)
{
TestKeeperMultiRequest request(requests);
RequestInfo request_info;
request_info.request = std::make_shared<TestKeeperMultiRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const MultiResponse &>(response)); };
pushRequest(std::move(request_info));
}
}

View File

@ -0,0 +1,143 @@
#pragma once
#include <mutex>
#include <map>
#include <atomic>
#include <thread>
#include <chrono>
#include <Poco/Timespan.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ThreadPool.h>
#include <Common/ConcurrentBoundedQueue.h>
namespace Coordination
{
struct TestKeeperRequest;
using TestKeeperRequestPtr = std::shared_ptr<TestKeeperRequest>;
/** Looks like ZooKeeper but stores all data in memory of server process.
* All data is not shared between different servers and is lost after server restart.
*
* The only purpose is to more simple testing for interaction with ZooKeeper within a single server.
* This still makes sense, because multiple replicas of a single table can be created on a single server,
* and it is used to test replication logic.
*
* Does not support ACLs. Does not support NULL node values.
*
* NOTE: You can add various failure modes for better testing.
*/
class TestKeeper : public IKeeper
{
public:
TestKeeper(const String & root_path, Poco::Timespan operation_timeout);
~TestKeeper() override;
bool isExpired() const override { return expired; }
int64_t getSessionID() const override { return 0; }
void create(
const String & path,
const String & data,
bool is_ephemeral,
bool is_sequential,
const ACLs & acls,
CreateCallback callback) override;
void remove(
const String & path,
int32_t version,
RemoveCallback callback) override;
void exists(
const String & path,
ExistsCallback callback,
WatchCallback watch) override;
void get(
const String & path,
GetCallback callback,
WatchCallback watch) override;
void set(
const String & path,
const String & data,
int32_t version,
SetCallback callback) override;
void list(
const String & path,
ListCallback callback,
WatchCallback watch) override;
void check(
const String & path,
int32_t version,
CheckCallback callback) override;
void multi(
const Requests & requests,
MultiCallback callback) override;
struct Node
{
String data;
ACLs acls;
bool is_ephemeral = false;
bool is_sequental = false;
Stat stat{};
int32_t seq_num = 0;
};
using Container = std::map<std::string, Node>;
using WatchCallbacks = std::vector<WatchCallback>;
using Watches = std::map<String /* path, relative of root_path */, WatchCallbacks>;
private:
using clock = std::chrono::steady_clock;
struct RequestInfo
{
TestKeeperRequestPtr request;
ResponseCallback callback;
WatchCallback watch;
clock::time_point time;
};
Container container;
String root_path;
ACLs default_acls;
Poco::Timespan operation_timeout;
std::mutex push_request_mutex;
std::atomic<bool> expired{false};
int64_t zxid = 0;
Watches watches;
Watches list_watches; /// Watches for 'list' request (watches on children).
void createWatchCallBack(const String & path);
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
RequestsQueue requests_queue{1};
void pushRequest(RequestInfo && request);
void finalize();
ThreadFromGlobalPool processing_thread;
void processingThread();
};
}

View File

@ -1,6 +1,7 @@
#include "ZooKeeper.h"
#include "ZooKeeperImpl.h"
#include "KeeperException.h"
#include "TestKeeper.h"
#include <random>
#include <pcg_random.hpp>
@ -24,6 +25,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
}
}
@ -44,7 +46,7 @@ static void check(int32_t code, const std::string & path)
}
void ZooKeeper::init(const std::string & hosts_, const std::string & identity_,
void ZooKeeper::init(const std::string & implementation, const std::string & hosts_, const std::string & identity_,
int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_)
{
log = &Logger::get("ZooKeeper");
@ -54,48 +56,61 @@ void ZooKeeper::init(const std::string & hosts_, const std::string & identity_,
operation_timeout_ms = operation_timeout_ms_;
chroot = chroot_;
if (hosts.empty())
throw KeeperException("No addresses passed to ZooKeeper constructor.", Coordination::ZBADARGUMENTS);
std::vector<std::string> addresses_strings;
boost::split(addresses_strings, hosts, boost::is_any_of(","));
Coordination::ZooKeeper::Addresses addresses;
addresses.reserve(addresses_strings.size());
for (const auto & address_string : addresses_strings)
if (implementation == "zookeeper")
{
try
if (hosts.empty())
throw KeeperException("No addresses passed to ZooKeeper constructor.", Coordination::ZBADARGUMENTS);
std::vector<std::string> addresses_strings;
boost::split(addresses_strings, hosts, boost::is_any_of(","));
Coordination::ZooKeeper::Addresses addresses;
addresses.reserve(addresses_strings.size());
for (const auto &address_string : addresses_strings)
{
addresses.emplace_back(address_string);
}
catch (const Poco::Net::DNSException & e)
{
LOG_ERROR(log, "Cannot use ZooKeeper address " << address_string << ", reason: " << e.displayText());
try
{
addresses.emplace_back(address_string);
}
catch (const Poco::Net::DNSException &e)
{
LOG_ERROR(log, "Cannot use ZooKeeper address " << address_string << ", reason: " << e.displayText());
}
}
if (addresses.empty())
throw KeeperException("Cannot use any of provided ZooKeeper addresses", Coordination::ZBADARGUMENTS);
impl = std::make_unique<Coordination::ZooKeeper>(
addresses,
chroot,
identity_.empty() ? "" : "digest",
identity_,
Poco::Timespan(0, session_timeout_ms_ * 1000),
Poco::Timespan(0, ZOOKEEPER_CONNECTION_TIMEOUT_MS * 1000),
Poco::Timespan(0, operation_timeout_ms_ * 1000));
LOG_TRACE(log, "initialized, hosts: " << hosts << (chroot.empty() ? "" : ", chroot: " + chroot));
}
else if (implementation == "testkeeper")
{
impl = std::make_unique<Coordination::TestKeeper>(
chroot,
Poco::Timespan(0, operation_timeout_ms_ * 1000));
}
else
{
throw DB::Exception("Unknown implementation of coordination service: " + implementation, DB::ErrorCodes::NOT_IMPLEMENTED);
}
if (addresses.empty())
throw KeeperException("Cannot use any of provided ZooKeeper addresses", Coordination::ZBADARGUMENTS);
impl = std::make_unique<Coordination::ZooKeeper>(
addresses,
chroot,
identity_.empty() ? "" : "digest",
identity_,
Poco::Timespan(0, session_timeout_ms_ * 1000),
Poco::Timespan(0, ZOOKEEPER_CONNECTION_TIMEOUT_MS * 1000),
Poco::Timespan(0, operation_timeout_ms_ * 1000));
LOG_TRACE(log, "initialized, hosts: " << hosts << (chroot.empty() ? "" : ", chroot: " + chroot));
if (!chroot.empty() && !exists("/"))
throw KeeperException("Zookeeper root doesn't exist. You should create root node " + chroot + " before start.", Coordination::ZNONODE);
}
ZooKeeper::ZooKeeper(const std::string & hosts, const std::string & identity,
int32_t session_timeout_ms, int32_t operation_timeout_ms, const std::string & chroot)
ZooKeeper::ZooKeeper(const std::string & hosts, const std::string & identity, int32_t session_timeout_ms,
int32_t operation_timeout_ms, const std::string & chroot, const std::string & implementation)
{
init(hosts, identity, session_timeout_ms, operation_timeout_ms, chroot);
init(implementation, hosts, identity, session_timeout_ms, operation_timeout_ms, chroot);
}
struct ZooKeeperArgs
@ -109,13 +124,14 @@ struct ZooKeeperArgs
session_timeout_ms = DEFAULT_SESSION_TIMEOUT;
operation_timeout_ms = DEFAULT_OPERATION_TIMEOUT;
implementation = "zookeeper";
for (const auto & key : keys)
{
if (startsWith(key, "node"))
{
hosts_strings.push_back(
config.getString(config_name + "." + key + ".host") + ":"
+ config.getString(config_name + "." + key + ".port", "2181")
config.getString(config_name + "." + key + ".host") + ":"
+ config.getString(config_name + "." + key + ".port", "2181")
);
}
else if (key == "session_timeout_ms")
@ -134,6 +150,10 @@ struct ZooKeeperArgs
{
chroot = config.getString(config_name + "." + key);
}
else if (key == "implementation")
{
implementation = config.getString(config_name + "." + key);
}
else
throw KeeperException(std::string("Unknown key ") + key + " in config file", Coordination::ZBADARGUMENTS);
}
@ -163,12 +183,13 @@ struct ZooKeeperArgs
int session_timeout_ms;
int operation_timeout_ms;
std::string chroot;
std::string implementation;
};
ZooKeeper::ZooKeeper(const Poco::Util::AbstractConfiguration & config, const std::string & config_name)
{
ZooKeeperArgs args(config, config_name);
init(args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot);
init(args.implementation, args.hosts, args.identity, args.session_timeout_ms, args.operation_timeout_ms, args.chroot);
}
@ -181,8 +202,8 @@ static Coordination::WatchCallback callbackForEvent(const EventPtr & watch)
int32_t ZooKeeper::getChildrenImpl(const std::string & path, Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback)
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback)
{
int32_t code = 0;
Poco::Event event;
@ -205,7 +226,7 @@ int32_t ZooKeeper::getChildrenImpl(const std::string & path, Strings & res,
}
Strings ZooKeeper::getChildren(
const std::string & path, Coordination::Stat * stat, const EventPtr & watch)
const std::string & path, Coordination::Stat * stat, const EventPtr & watch)
{
Strings res;
check(tryGetChildren(path, res, stat, watch), path);
@ -213,7 +234,7 @@ Strings ZooKeeper::getChildren(
}
Strings ZooKeeper::getChildrenWatch(
const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
{
Strings res;
check(tryGetChildrenWatch(path, res, stat, watch_callback), path);
@ -221,7 +242,7 @@ Strings ZooKeeper::getChildrenWatch(
}
int32_t ZooKeeper::tryGetChildren(const std::string & path, Strings & res,
Coordination::Stat * stat, const EventPtr & watch)
Coordination::Stat * stat, const EventPtr & watch)
{
int32_t code = getChildrenImpl(path, res, stat, callbackForEvent(watch));
@ -232,7 +253,7 @@ int32_t ZooKeeper::tryGetChildren(const std::string & path, Strings & res,
}
int32_t ZooKeeper::tryGetChildrenWatch(const std::string & path, Strings & res,
Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
Coordination::Stat * stat, Coordination::WatchCallback watch_callback)
{
int32_t code = getChildrenImpl(path, res, stat, watch_callback);
@ -272,9 +293,9 @@ int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data,
int32_t code = createImpl(path, data, mode, path_created);
if (!(code == Coordination::ZOK ||
code == Coordination::ZNONODE ||
code == Coordination::ZNODEEXISTS ||
code == Coordination::ZNOCHILDRENFOREPHEMERALS))
code == Coordination::ZNONODE ||
code == Coordination::ZNODEEXISTS ||
code == Coordination::ZNOCHILDRENFOREPHEMERALS))
throw KeeperException(code, path);
return code;
@ -336,9 +357,9 @@ int32_t ZooKeeper::tryRemove(const std::string & path, int32_t version)
{
int32_t code = removeImpl(path, version);
if (!(code == Coordination::ZOK ||
code == Coordination::ZNONODE ||
code == Coordination::ZBADVERSION ||
code == Coordination::ZNOTEMPTY))
code == Coordination::ZNONODE ||
code == Coordination::ZBADVERSION ||
code == Coordination::ZNOTEMPTY))
throw KeeperException(code, path);
return code;
}
@ -439,7 +460,7 @@ bool ZooKeeper::tryGetWatch(const std::string & path, std::string & res, Coordin
}
int32_t ZooKeeper::setImpl(const std::string & path, const std::string & data,
int32_t version, Coordination::Stat * stat)
int32_t version, Coordination::Stat * stat)
{
int32_t code = 0;
Poco::Event event;
@ -474,13 +495,13 @@ void ZooKeeper::createOrUpdate(const std::string & path, const std::string & dat
}
int32_t ZooKeeper::trySet(const std::string & path, const std::string & data,
int32_t version, Coordination::Stat * stat)
int32_t version, Coordination::Stat * stat)
{
int32_t code = setImpl(path, data, version, stat);
if (!(code == Coordination::ZOK ||
code == Coordination::ZNONODE ||
code == Coordination::ZBADVERSION))
code == Coordination::ZNONODE ||
code == Coordination::ZBADVERSION))
throw KeeperException(code, path);
return code;
}
@ -866,15 +887,15 @@ size_t KeeperMultiException::getFailedOpIndex(int32_t exception_code, const Coor
if (!Coordination::isUserError(exception_code))
throw DB::Exception("There are no failed OPs because '" + ZooKeeper::error2string(exception_code) + "' is not valid response code for that",
DB::ErrorCodes::LOGICAL_ERROR);
DB::ErrorCodes::LOGICAL_ERROR);
throw DB::Exception("There is no failed OpResult", DB::ErrorCodes::LOGICAL_ERROR);
}
KeeperMultiException::KeeperMultiException(int32_t exception_code, const Coordination::Requests & requests, const Coordination::Responses & responses)
: KeeperException("Transaction failed", exception_code),
requests(requests), responses(responses), failed_op_index(getFailedOpIndex(exception_code, responses))
: KeeperException("Transaction failed", exception_code),
requests(requests), responses(responses), failed_op_index(getFailedOpIndex(exception_code, responses))
{
addMessage("Op #" + std::to_string(failed_op_index) + ", path: " + getPathForFirstFailedOp());
}
@ -931,5 +952,4 @@ Coordination::RequestPtr makeCheckRequest(const std::string & path, int version)
request->version = version;
return request;
}
}

View File

@ -55,7 +55,8 @@ public:
ZooKeeper(const std::string & hosts, const std::string & identity = "",
int32_t session_timeout_ms = DEFAULT_SESSION_TIMEOUT,
int32_t operation_timeout_ms = DEFAULT_OPERATION_TIMEOUT,
const std::string & chroot = "");
const std::string & chroot = "",
const std::string & implementation = "zookeeper");
/** Config of the form:
<zookeeper>
@ -127,7 +128,7 @@ public:
bool tryGetWatch(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback, int * code = nullptr);
void set(const std::string & path, const std::string & data,
int32_t version = -1, Coordination::Stat * stat = nullptr);
int32_t version = -1, Coordination::Stat * stat = nullptr);
/// Creates the node if it doesn't exist. Updates its contents otherwise.
void createOrUpdate(const std::string & path, const std::string & data, int32_t mode);
@ -136,7 +137,7 @@ public:
/// * The node doesn't exist.
/// * Versions do not match.
int32_t trySet(const std::string & path, const std::string & data,
int32_t version = -1, Coordination::Stat * stat = nullptr);
int32_t version = -1, Coordination::Stat * stat = nullptr);
Strings getChildren(const std::string & path,
Coordination::Stat * stat = nullptr,
@ -149,12 +150,12 @@ public:
/// Doesn't not throw in the following cases:
/// * The node doesn't exist.
int32_t tryGetChildren(const std::string & path, Strings & res,
Coordination::Stat * stat = nullptr,
const EventPtr & watch = nullptr);
Coordination::Stat * stat = nullptr,
const EventPtr & watch = nullptr);
int32_t tryGetChildrenWatch(const std::string & path, Strings & res,
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback);
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback);
/// Performs several operations in a transaction.
/// Throws on every error.
@ -235,7 +236,7 @@ public:
private:
friend class EphemeralNodeHolder;
void init(const std::string & hosts_, const std::string & identity_,
void init(const std::string & implementation_, const std::string & hosts_, const std::string & identity_,
int32_t session_timeout_ms_, int32_t operation_timeout_ms_, const std::string & chroot_);
void removeChildrenRecursive(const std::string & path);
@ -274,7 +275,7 @@ public:
using Ptr = std::shared_ptr<EphemeralNodeHolder>;
EphemeralNodeHolder(const std::string & path_, ZooKeeper & zookeeper_, bool create, bool sequential, const std::string & data)
: path(path_), zookeeper(zookeeper_)
: path(path_), zookeeper(zookeeper_)
{
if (create)
path = zookeeper.create(path, data, sequential ? CreateMode::EphemeralSequential : CreateMode::Ephemeral);
@ -320,5 +321,4 @@ private:
};
using EphemeralNodeHolderPtr = EphemeralNodeHolder::Ptr;
}

View File

@ -1,16 +1,5 @@
<yandex>
<zookeeper>
<node>
<host>localhost</host>
<port>2181</port>
</node>
<node>
<host>yandex.ru</host>
<port>2181</port>
</node>
<node>
<host>111.0.1.2</host>
<port>2181</port>
</node>
<implementation>testkeeper</implementation>
</zookeeper>
</yandex>