"reconfig" support for CH Keeper

This commit is contained in:
Mike Kot 2023-04-20 13:26:02 +00:00
parent bc4a361d69
commit 8b6376005a
70 changed files with 2309 additions and 367 deletions

View File

@ -448,7 +448,7 @@ inline char * find_last_not_symbols_or_null(char * begin, char * end)
/// See https://github.com/boostorg/algorithm/issues/63
/// And https://bugs.llvm.org/show_bug.cgi?id=41141
template <char... symbols, typename To>
inline void splitInto(To & to, const std::string & what, bool token_compress = false)
inline To& splitInto(To & to, std::string_view what, bool token_compress = false)
{
const char * pos = what.data();
const char * end = pos + what.size();
@ -464,4 +464,6 @@ inline void splitInto(To & to, const std::string & what, bool token_compress = f
else
pos = delimiter_or_end;
}
return to;
}

9
base/base/move_extend.h Normal file
View File

@ -0,0 +1,9 @@
#pragma once
/// Extend @p to by moving elements from @p from to @p to end
/// @return @p to iterator to first of moved elements.
template <class To, class From>
typename To::iterator moveExtend(To & to, From && from)
{
return to.insert(to.end(), std::make_move_iterator(from.begin()), std::make_move_iterator(from.end()));
}

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit 491eaf592d950e0e37accbe8b3f217e068c9fecf
Subproject commit eb1572129c71beb2156dcdaadc3fb136954aed96

View File

@ -34,6 +34,8 @@ add_dependencies(clickhouse-keeper-lib clickhouse_keeper_configs)
if (BUILD_STANDALONE_KEEPER)
# Straight list of all required sources
set(CLICKHOUSE_KEEPER_STANDALONE_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/KeeperReconfiguration.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/RaftServerConfig.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/ACLMap.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/Changelog.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Coordination/CoordinationSettings.cpp

View File

@ -125,6 +125,7 @@
M(ZooKeeperMulti, "Number of 'multi' requests to ZooKeeper (compound transactions).") \
M(ZooKeeperCheck, "Number of 'check' requests to ZooKeeper. Usually they don't make sense in isolation, only as part of a complex transaction.") \
M(ZooKeeperSync, "Number of 'sync' requests to ZooKeeper. These requests are rarely needed or usable.") \
M(ZooKeeperReconfig, "Number of 'reconfig' requests to ZooKeeper.") \
M(ZooKeeperClose, "Number of times connection with ZooKeeper has been closed voluntary.") \
M(ZooKeeperWatchResponse, "Number of times watch notification has been received from ZooKeeper.") \
M(ZooKeeperUserExceptions, "Number of exceptions while working with ZooKeeper related to the data (no node, bad version or similar).") \
@ -499,6 +500,7 @@ The server successfully detected this situation and will download merged part fr
M(KeeperCreateRequest, "Number of create requests")\
M(KeeperRemoveRequest, "Number of remove requests")\
M(KeeperSetRequest, "Number of set requests")\
M(KeeperReconfigRequest, "Number of reconfig requests")\
M(KeeperCheckRequest, "Number of check requests")\
M(KeeperMultiRequest, "Number of multi requests")\
M(KeeperMultiReadRequest, "Number of multi read requests")\

View File

@ -110,6 +110,7 @@ const char * errorMessage(Error code)
case Error::ZCLOSING: return "ZooKeeper is closing";
case Error::ZNOTHING: return "(not error) no server responses to process";
case Error::ZSESSIONMOVED: return "Session moved to another server, so operation is ignored";
case Error::ZRECONFIGINPROGRESS: return "Another reconfiguration is progress";
}
UNREACHABLE();

View File

@ -82,6 +82,7 @@ enum class Error : int32_t
ZOPERATIONTIMEOUT = -7, /// Operation timeout
ZBADARGUMENTS = -8, /// Invalid arguments
ZINVALIDSTATE = -9, /// Invalid zhandle state
ZRECONFIGINPROGRESS = -14, /// Another reconfig is running
/** API errors.
* This is never thrown by the server, it shouldn't be used other than
@ -350,6 +351,29 @@ struct SyncResponse : virtual Response
size_t bytesSize() const override { return path.size(); }
};
struct ReconfigRequest : virtual Request
{
String joining;
String leaving;
String new_members;
int32_t version;
String getPath() const final { return keeper_config_path; }
size_t bytesSize() const final
{
return joining.size() + leaving.size() + new_members.size() + sizeof(version);
}
};
struct ReconfigResponse : virtual Response
{
String value;
Stat stat;
size_t bytesSize() const override { return value.size() + sizeof(stat); }
};
struct MultiRequest : virtual Request
{
Requests requests;
@ -395,9 +419,9 @@ using SetCallback = std::function<void(const SetResponse &)>;
using ListCallback = std::function<void(const ListResponse &)>;
using CheckCallback = std::function<void(const CheckResponse &)>;
using SyncCallback = std::function<void(const SyncResponse &)>;
using ReconfigCallback = std::function<void(const ReconfigResponse &)>;
using MultiCallback = std::function<void(const MultiResponse &)>;
/// For watches.
enum State
{
@ -526,6 +550,13 @@ public:
const String & path,
SyncCallback callback) = 0;
virtual void reconfig(
std::string_view joining,
std::string_view leaving,
std::string_view new_members,
int32_t version,
ReconfigCallback callback) = 0;
virtual void multi(
const Requests & requests,
MultiCallback callback) = 0;
@ -539,3 +570,11 @@ public:
};
}
template <> struct fmt::formatter<Coordination::Error> : fmt::formatter<std::string_view>
{
constexpr auto format(Coordination::Error code, auto& ctx)
{
return formatter<string_view>::format(Coordination::errorMessage(code), ctx);
}
};

View File

@ -3,12 +3,8 @@
#include <Common/setThreadName.h>
#include <Common/StringUtils/StringUtils.h>
#include <base/types.h>
#include <sstream>
#include <iomanip>
#include <functional>
namespace Coordination
{
@ -147,6 +143,14 @@ struct TestKeeperSyncRequest final : SyncRequest, TestKeeperRequest
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
struct TestKeeperReconfigRequest final : ReconfigRequest, TestKeeperRequest
{
TestKeeperReconfigRequest() = default;
explicit TestKeeperReconfigRequest(const ReconfigRequest & base) : ReconfigRequest(base) {}
ResponsePtr createResponse() const override;
std::pair<ResponsePtr, Undo> process(TestKeeper::Container & container, int64_t zxid) const override;
};
struct TestKeeperMultiRequest final : MultiRequest, TestKeeperRequest
{
explicit TestKeeperMultiRequest(const Requests & generic_requests)
@ -226,15 +230,7 @@ std::pair<ResponsePtr, Undo> TestKeeperCreateRequest::process(TestKeeper::Contai
std::string path_created = path;
if (is_sequential)
{
auto seq_num = it->second.seq_num;
std::stringstream seq_num_str; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
seq_num_str.exceptions(std::ios::failbit);
seq_num_str << std::setw(10) << std::setfill('0') << seq_num;
path_created += seq_num_str.str();
}
path_created += fmt::format("{:0>10}", it->second.seq_num);
/// Increment sequential number even if node is not sequential
++it->second.seq_num;
@ -446,6 +442,17 @@ std::pair<ResponsePtr, Undo> TestKeeperSyncRequest::process(TestKeeper::Containe
return { std::make_shared<SyncResponse>(std::move(response)), {} };
}
std::pair<ResponsePtr, Undo> TestKeeperReconfigRequest::process(TestKeeper::Container &, int64_t) const
{
// In TestKeeper we assume data is stored on one server, so this is a dummy implementation to
// satisfy IKeeper interface.
// We can't even check the validity of input data, neither can we create the /keeper/config znode
// as we don't know the id of current "server".
ReconfigResponse response;
response.error = Error::ZOK;
return { std::make_shared<ReconfigResponse>(std::move(response)), {} };
}
std::pair<ResponsePtr, Undo> TestKeeperMultiRequest::process(TestKeeper::Container & container, int64_t zxid) const
{
MultiResponse response;
@ -505,6 +512,7 @@ ResponsePtr TestKeeperSetRequest::createResponse() const { return std::make_shar
ResponsePtr TestKeeperListRequest::createResponse() const { return std::make_shared<ListResponse>(); }
ResponsePtr TestKeeperCheckRequest::createResponse() const { return std::make_shared<CheckResponse>(); }
ResponsePtr TestKeeperSyncRequest::createResponse() const { return std::make_shared<SyncResponse>(); }
ResponsePtr TestKeeperReconfigRequest::createResponse() const { return std::make_shared<ReconfigResponse>(); }
ResponsePtr TestKeeperMultiRequest::createResponse() const { return std::make_shared<MultiResponse>(); }
@ -828,6 +836,28 @@ void TestKeeper::sync(
pushRequest(std::move(request_info));
}
void TestKeeper::reconfig(
std::string_view joining,
std::string_view leaving,
std::string_view new_members,
int32_t version,
ReconfigCallback callback)
{
TestKeeperReconfigRequest req;
req.joining = joining;
req.leaving = leaving;
req.new_members = new_members;
req.version = version;
pushRequest({
.request = std::make_shared<TestKeeperReconfigRequest>(std::move(req)),
.callback = [callback](const Response & response)
{
callback(dynamic_cast<const ReconfigResponse &>(response));
}
});
}
void TestKeeper::multi(
const Requests & requests,
MultiCallback callback)

View File

@ -87,6 +87,13 @@ public:
const String & path,
SyncCallback callback) override;
void reconfig(
std::string_view joining,
std::string_view leaving,
std::string_view new_members,
int32_t version,
ReconfigCallback callback) final;
void multi(
const Requests & requests,
MultiCallback callback) override;

View File

@ -75,13 +75,14 @@ void ZooKeeper::init(ZooKeeperArgs args_)
auto & host_string = host.host;
try
{
bool secure = startsWith(host_string, "secure://");
const bool secure = startsWith(host_string, "secure://");
if (secure)
host_string.erase(0, strlen("secure://"));
LOG_TEST(log, "Adding ZooKeeper host {} ({})", host_string, Poco::Net::SocketAddress{host_string}.toString());
nodes.emplace_back(Coordination::ZooKeeper::Node{Poco::Net::SocketAddress{host_string}, secure});
const Poco::Net::SocketAddress host_socket_addr{host_string};
LOG_TEST(log, "Adding ZooKeeper host {} ({})", host_string, host_socket_addr.toString());
nodes.emplace_back(Coordination::ZooKeeper::Node{host_socket_addr, secure});
}
catch (const Poco::Net::HostNotFoundException & e)
{
@ -191,12 +192,7 @@ std::vector<ShuffleHost> ZooKeeper::shuffleHosts() const
shuffle_hosts.emplace_back(shuffle_host);
}
::sort(
shuffle_hosts.begin(), shuffle_hosts.end(),
[](const ShuffleHost & lhs, const ShuffleHost & rhs)
{
return ShuffleHost::compare(lhs, rhs);
});
::sort(shuffle_hosts.begin(), shuffle_hosts.end(), ShuffleHost::compare);
return shuffle_hosts;
}
@ -231,7 +227,7 @@ Coordination::Error ZooKeeper::getChildrenImpl(const std::string & path, Strings
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::List), path));
impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::List, path));
return Coordination::Error::ZOPERATIONTIMEOUT;
}
else
@ -298,7 +294,7 @@ Coordination::Error ZooKeeper::createImpl(const std::string & path, const std::s
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Create), path));
impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::Create, path));
return Coordination::Error::ZOPERATIONTIMEOUT;
}
else
@ -393,7 +389,7 @@ Coordination::Error ZooKeeper::removeImpl(const std::string & path, int32_t vers
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Remove), path));
impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::Remove, path));
return Coordination::Error::ZOPERATIONTIMEOUT;
}
else
@ -425,7 +421,7 @@ Coordination::Error ZooKeeper::existsImpl(const std::string & path, Coordination
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Exists), path));
impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::Exists, path));
return Coordination::Error::ZOPERATIONTIMEOUT;
}
else
@ -459,7 +455,7 @@ Coordination::Error ZooKeeper::getImpl(const std::string & path, std::string & r
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Get), path));
impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::Get, path));
return Coordination::Error::ZOPERATIONTIMEOUT;
}
else
@ -531,7 +527,7 @@ Coordination::Error ZooKeeper::setImpl(const std::string & path, const std::stri
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Set), path));
impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::Set, path));
return Coordination::Error::ZOPERATIONTIMEOUT;
}
else
@ -583,7 +579,7 @@ Coordination::Error ZooKeeper::multiImpl(const Coordination::Requests & requests
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Multi), requests[0]->getPath()));
impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::Multi, requests[0]->getPath()));
return Coordination::Error::ZOPERATIONTIMEOUT;
}
else
@ -617,7 +613,7 @@ Coordination::Error ZooKeeper::syncImpl(const std::string & path, std::string &
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {} {}", toString(Coordination::OpNum::Sync), path));
impl->finalize(fmt::format("Operation timeout on {} {}", Coordination::OpNum::Sync, path));
return Coordination::Error::ZOPERATIONTIMEOUT;
}
else
@ -1229,7 +1225,7 @@ size_t getFailedOpIndex(Coordination::Error exception_code, const Coordination::
if (!Coordination::isUserError(exception_code))
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
"There are no failed OPs because '{}' is not valid response code for that",
std::string(Coordination::errorMessage(exception_code)));
exception_code);
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "There is no failed OpResult");
}

View File

@ -36,7 +36,7 @@ std::string ZooKeeperRequest::toString() const
"OpNum = {}\n"
"Additional info:\n{}",
xid,
Coordination::toString(getOpNum()),
getOpNum(),
toStringImpl());
}
@ -76,6 +76,41 @@ void ZooKeeperSyncResponse::writeImpl(WriteBuffer & out) const
Coordination::write(path, out);
}
void ZooKeeperReconfigRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(joining, out);
Coordination::write(leaving, out);
Coordination::write(new_members, out);
Coordination::write(version, out);
}
void ZooKeeperReconfigRequest::readImpl(ReadBuffer & in)
{
Coordination::read(joining, in);
Coordination::read(leaving, in);
Coordination::read(new_members, in);
Coordination::read(version, in);
}
std::string ZooKeeperReconfigRequest::toStringImpl() const
{
return fmt::format(
"joining = {}\nleaving = {}\nnew_members = {}\nversion = {}",
joining, leaving, new_members, version);
}
void ZooKeeperReconfigResponse::readImpl(ReadBuffer & in)
{
Coordination::read(value, in);
Coordination::read(stat, in);
}
void ZooKeeperReconfigResponse::writeImpl(WriteBuffer & out) const
{
Coordination::write(value, out);
Coordination::write(stat, out);
}
void ZooKeeperWatchResponse::readImpl(ReadBuffer & in)
{
Coordination::read(type, in);
@ -664,6 +699,7 @@ ZooKeeperResponsePtr ZooKeeperRemoveRequest::makeResponse() const { return setTi
ZooKeeperResponsePtr ZooKeeperExistsRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperExistsResponse>()); }
ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperGetResponse>()); }
ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSetResponse>()); }
ZooKeeperResponsePtr ZooKeeperReconfigRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperReconfigResponse>()); }
ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperListResponse>()); }
ZooKeeperResponsePtr ZooKeeperSimpleListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSimpleListResponse>()); }
@ -861,7 +897,8 @@ void ZooKeeperMultiResponse::fillLogElements(LogElements & elems, size_t idx) co
void ZooKeeperRequestFactory::registerRequest(OpNum op_num, Creator creator)
{
if (!op_num_to_request.try_emplace(op_num, creator).second)
throw Coordination::Exception("Request type " + toString(op_num) + " already registered", Coordination::Error::ZRUNTIMEINCONSISTENCY);
throw Coordination::Exception(Coordination::Error::ZRUNTIMEINCONSISTENCY,
"Request type {} already registered", op_num);
}
std::shared_ptr<ZooKeeperRequest> ZooKeeperRequest::read(ReadBuffer & in)
@ -916,7 +953,7 @@ ZooKeeperRequestPtr ZooKeeperRequestFactory::get(OpNum op_num) const
{
auto it = op_num_to_request.find(op_num);
if (it == op_num_to_request.end())
throw Exception("Unknown operation type " + toString(op_num), Error::ZBADARGUMENTS);
throw Exception(Error::ZBADARGUMENTS, "Unknown operation type {}", op_num);
return it->second();
}
@ -960,6 +997,7 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
registerZooKeeperRequest<OpNum::SimpleList, ZooKeeperSimpleListRequest>(*this);
registerZooKeeperRequest<OpNum::List, ZooKeeperListRequest>(*this);
registerZooKeeperRequest<OpNum::Check, ZooKeeperCheckRequest>(*this);
registerZooKeeperRequest<OpNum::Reconfig, ZooKeeperReconfigRequest>(*this);
registerZooKeeperRequest<OpNum::Multi, ZooKeeperMultiRequest>(*this);
registerZooKeeperRequest<OpNum::MultiRead, ZooKeeperMultiRequest>(*this);
registerZooKeeperRequest<OpNum::SessionID, ZooKeeperSessionIDRequest>(*this);

View File

@ -117,6 +117,35 @@ struct ZooKeeperSyncResponse final : SyncResponse, ZooKeeperResponse
OpNum getOpNum() const override { return OpNum::Sync; }
};
struct ZooKeeperReconfigRequest final : ZooKeeperRequest
{
String joining;
String leaving;
String new_members;
int64_t version; // kazoo sends a 64bit integer in this request
String getPath() const override { return keeper_config_path; }
OpNum getOpNum() const override { return OpNum::Reconfig; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
size_t bytesSize() const override
{
return ZooKeeperRequest::bytesSize() + joining.size() + leaving.size() + new_members.size()
+ sizeof(version);
}
};
struct ZooKeeperReconfigResponse final : ReconfigResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::Reconfig; }
};
struct ZooKeeperHeartbeatResponse final : ZooKeeperResponse
{
void readImpl(ReadBuffer &) override {}

View File

@ -19,6 +19,7 @@ static const std::unordered_set<int32_t> VALID_OPERATIONS =
static_cast<int32_t>(OpNum::Heartbeat),
static_cast<int32_t>(OpNum::List),
static_cast<int32_t>(OpNum::Check),
static_cast<int32_t>(OpNum::Reconfig),
static_cast<int32_t>(OpNum::Multi),
static_cast<int32_t>(OpNum::MultiRead),
static_cast<int32_t>(OpNum::Auth),
@ -29,55 +30,6 @@ static const std::unordered_set<int32_t> VALID_OPERATIONS =
static_cast<int32_t>(OpNum::CheckNotExists),
};
std::string toString(OpNum op_num)
{
switch (op_num)
{
case OpNum::Close:
return "Close";
case OpNum::Error:
return "Error";
case OpNum::Create:
return "Create";
case OpNum::Remove:
return "Remove";
case OpNum::Exists:
return "Exists";
case OpNum::Get:
return "Get";
case OpNum::Set:
return "Set";
case OpNum::SimpleList:
return "SimpleList";
case OpNum::List:
return "List";
case OpNum::Check:
return "Check";
case OpNum::Multi:
return "Multi";
case OpNum::MultiRead:
return "MultiRead";
case OpNum::Sync:
return "Sync";
case OpNum::Heartbeat:
return "Heartbeat";
case OpNum::Auth:
return "Auth";
case OpNum::SessionID:
return "SessionID";
case OpNum::SetACL:
return "SetACL";
case OpNum::GetACL:
return "GetACL";
case OpNum::FilteredList:
return "FilteredList";
case OpNum::CheckNotExists:
return "CheckNotExists";
}
int32_t raw_op = static_cast<int32_t>(op_num);
throw Exception("Operation " + std::to_string(raw_op) + " is unknown", Error::ZUNIMPLEMENTED);
}
OpNum getOpNum(int32_t raw_op_num)
{
if (!VALID_OPERATIONS.contains(raw_op_num))

View File

@ -31,6 +31,7 @@ enum class OpNum : int32_t
List = 12,
Check = 13,
Multi = 14,
Reconfig = 16,
MultiRead = 22,
Auth = 100,
@ -41,7 +42,6 @@ enum class OpNum : int32_t
SessionID = 997, /// Special internal request
};
std::string toString(OpNum op_num);
OpNum getOpNum(int32_t raw_op_num);
static constexpr int32_t ZOOKEEPER_PROTOCOL_VERSION = 0;

View File

@ -35,6 +35,7 @@ namespace ProfileEvents
extern const Event ZooKeeperRemove;
extern const Event ZooKeeperExists;
extern const Event ZooKeeperMulti;
extern const Event ZooKeeperReconfig;
extern const Event ZooKeeperGet;
extern const Event ZooKeeperSet;
extern const Event ZooKeeperList;
@ -571,7 +572,7 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data)
if (err != Error::ZOK)
throw Exception(Error::ZMARSHALLINGERROR, "Error received in reply to auth request. Code: {}. Message: {}",
static_cast<int32_t>(err), errorMessage(err));
static_cast<int32_t>(err), err);
}
void ZooKeeper::sendThread()
@ -697,7 +698,7 @@ void ZooKeeper::receiveThread()
if (earliest_operation)
{
throw Exception(Error::ZOPERATIONTIMEOUT, "Operation timeout (no response in {} ms) for request {} for path: {}",
args.operation_timeout_ms, toString(earliest_operation->request->getOpNum()), earliest_operation->request->getPath());
args.operation_timeout_ms, earliest_operation->request->getOpNum(), earliest_operation->request->getPath());
}
waited_us += max_wait_us;
if (waited_us >= args.session_timeout_ms * 1000)
@ -738,7 +739,7 @@ void ZooKeeper::receiveEvent()
if (xid == PING_XID)
{
if (err != Error::ZOK)
throw Exception(Error::ZRUNTIMEINCONSISTENCY, "Received error in heartbeat response: {}", errorMessage(err));
throw Exception(Error::ZRUNTIMEINCONSISTENCY, "Received error in heartbeat response: {}", err);
response = std::make_shared<ZooKeeperHeartbeatResponse>();
}
@ -1195,7 +1196,6 @@ void ZooKeeper::create(
ProfileEvents::increment(ProfileEvents::ZooKeeperCreate);
}
void ZooKeeper::remove(
const String & path,
int32_t version,
@ -1335,6 +1335,26 @@ void ZooKeeper::sync(
ProfileEvents::increment(ProfileEvents::ZooKeeperSync);
}
void ZooKeeper::reconfig(
std::string_view joining,
std::string_view leaving,
std::string_view new_members,
int32_t version,
ReconfigCallback callback)
{
ZooKeeperReconfigRequest request;
request.joining = joining;
request.leaving = leaving;
request.new_members = new_members;
request.version = version;
RequestInfo request_info;
request_info.request = std::make_shared<ZooKeeperReconfigRequest>(std::move(request));
request_info.callback = [callback](const Response & response) { callback(dynamic_cast<const ReconfigResponse &>(response)); };
pushRequest(std::move(request_info));
ProfileEvents::increment(ProfileEvents::ZooKeeperReconfig);
}
void ZooKeeper::multi(
const Requests & requests,

View File

@ -178,6 +178,13 @@ public:
const String & path,
SyncCallback callback) override;
void reconfig(
std::string_view joining,
std::string_view leaving,
std::string_view new_members,
int32_t version,
ReconfigCallback callback) final;
void multi(
const Requests & requests,
MultiCallback callback) override;

View File

@ -1,5 +1,4 @@
#pragma once
#include <IO/WriteHelpers.h>
namespace DB
@ -14,8 +13,8 @@ enum class KeeperApiVersion : uint8_t
WITH_CHECK_NOT_EXISTS,
};
const std::string keeper_system_path = "/keeper";
const std::string keeper_api_version_path = keeper_system_path + "/api_version";
const std::string keeper_api_feature_flags_path = keeper_system_path + "/feature_flags";
const String keeper_system_path = "/keeper";
const String keeper_api_version_path = keeper_system_path + "/api_version";
const String keeper_api_feature_flags_path = keeper_system_path + "/feature_flags";
const String keeper_config_path = keeper_system_path + "/config";
}

View File

@ -1,10 +1,8 @@
#pragma once
#include <Poco/Util/AbstractConfiguration.h>
#include <Coordination/KeeperFeatureFlags.h>
#include <IO/WriteBufferFromString.h>
#include <Disks/DiskSelector.h>
#include <IO/WriteBufferFromString.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <cstdint>
#include <memory>
@ -12,6 +10,8 @@
namespace DB
{
class KeeperDispatcher;
class KeeperContext
{
public:
@ -51,6 +51,7 @@ public:
const KeeperFeatureFlags & getFeatureFlags() const;
void dumpConfiguration(WriteBufferFromOwnString & buf) const;
private:
/// local disk defined using path or disk name
using Storage = std::variant<DiskPtr, std::string>;
@ -85,6 +86,7 @@ private:
std::unordered_map<std::string, std::string> system_nodes_with_data;
KeeperFeatureFlags feature_flags;
KeeperDispatcher * dispatcher{nullptr};
};
using KeeperContextPtr = std::shared_ptr<KeeperContext>;

View File

@ -38,6 +38,8 @@ namespace ProfileEvents
extern const Event MemoryAllocatorPurgeTimeMicroseconds;
}
using namespace std::chrono_literals;
namespace DB
{
@ -336,6 +338,7 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
keeper_context = std::make_shared<KeeperContext>(standalone_keeper);
keeper_context->initialize(config);
keeper_context->dispatcher = this;
server = std::make_unique<KeeperServer>(
configuration_and_settings,
@ -392,7 +395,10 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
/// Start it after keeper server start
session_cleaner_thread = ThreadFromGlobalPool([this] { sessionCleanerTask(); });
update_configuration_thread = ThreadFromGlobalPool([this] { updateConfigurationThread(); });
update_configuration_thread = reconfigEnabled()
? ThreadFromGlobalPool([this] { clusterUpdateThread(); })
: ThreadFromGlobalPool([this] { clusterUpdateWithReconfigDisabledThread(); });
LOG_DEBUG(log, "Dispatcher initialized");
}
@ -429,7 +435,7 @@ void KeeperDispatcher::shutdown()
if (snapshot_thread.joinable())
snapshot_thread.join();
update_configuration_queue.finish();
cluster_update_queue.finish();
if (update_configuration_thread.joinable())
update_configuration_thread.join();
}
@ -608,7 +614,7 @@ void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSession
"Could not push error response xid {} zxid {} error message {} to responses queue",
response->xid,
response->zxid,
errorMessage(error));
error);
}
}
@ -653,7 +659,7 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
{
if (response->getOpNum() != Coordination::OpNum::SessionID)
promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR,
"Incorrect response of type {} instead of SessionID response", Coordination::toString(response->getOpNum()))));
"Incorrect response of type {} instead of SessionID response", response->getOpNum())));
auto session_id_response = dynamic_cast<const Coordination::ZooKeeperSessionIDResponse &>(*response);
if (session_id_response.internal_id != internal_id)
@ -685,17 +691,12 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
return future.get();
}
void KeeperDispatcher::updateConfigurationThread()
void KeeperDispatcher::clusterUpdateWithReconfigDisabledThread()
{
while (true)
while (!shutdown_called)
{
if (shutdown_called)
return;
try
{
using namespace std::chrono_literals;
if (!server->checkInit())
{
LOG_INFO(log, "Server still not initialized, will not apply configuration until initialization finished");
@ -710,11 +711,10 @@ void KeeperDispatcher::updateConfigurationThread()
continue;
}
ConfigUpdateAction action;
if (!update_configuration_queue.pop(action))
ClusterUpdateAction action;
if (!cluster_update_queue.pop(action))
break;
/// We must wait this update from leader or apply it ourself (if we are leader)
bool done = false;
while (!done)
@ -727,15 +727,13 @@ void KeeperDispatcher::updateConfigurationThread()
if (isLeader())
{
server->applyConfigurationUpdate(action);
server->applyConfigUpdateWithReconfigDisabled(action);
done = true;
}
else
{
done = server->waitConfigurationUpdate(action);
if (!done)
LOG_INFO(log, "Cannot wait for configuration update, maybe we become leader, or maybe update is invalid, will try to wait one more time");
}
else if (done = server->waitForConfigUpdateWithReconfigDisabled(action); !done)
LOG_INFO(log,
"Cannot wait for configuration update, maybe we became leader "
"or maybe update is invalid, will try to wait one more time");
}
}
catch (...)
@ -745,6 +743,46 @@ void KeeperDispatcher::updateConfigurationThread()
}
}
void KeeperDispatcher::clusterUpdateThread()
{
while (!shutdown_called)
{
ClusterUpdateAction action;
if (!cluster_update_queue.pop(action))
return;
if (server->applyConfigUpdate(action))
LOG_DEBUG(log, "Processing config update {}: accepted", action);
else // TODO (myrrc) sleep a random amount? sleep less?
{
(void)cluster_update_queue.pushFront(action);
LOG_DEBUG(log, "Processing config update {}: declined, backoff", action);
std::this_thread::sleep_for(50ms);
}
}
}
void KeeperDispatcher::pushClusterUpdates(ClusterUpdateActions&& actions)
{
if (shutdown_called) return;
for (auto && action : actions)
{
if (!cluster_update_queue.push(std::move(action)))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot push configuration update");
LOG_DEBUG(log, "Processing config update {}: pushed", action);
}
}
bool KeeperDispatcher::clusterUpdateQueueEmpty() const
{
return cluster_update_queue.empty();
}
bool KeeperDispatcher::reconfigEnabled() const
{
return server->reconfigEnabled();
}
bool KeeperDispatcher::isServerActive() const
{
return checkInit() && hasLeader() && !server->isRecovering();
@ -752,20 +790,25 @@ bool KeeperDispatcher::isServerActive() const
void KeeperDispatcher::updateConfiguration(const Poco::Util::AbstractConfiguration & config, const MultiVersion<Macros>::Version & macros)
{
auto diff = server->getConfigurationDiff(config);
auto diff = server->getRaftConfigurationDiff(config);
if (diff.empty())
LOG_TRACE(log, "Configuration update triggered, but nothing changed for RAFT");
LOG_TRACE(log, "Configuration update triggered, but nothing changed for Raft");
else if (reconfigEnabled())
LOG_WARNING(log,
"Raft configuration changed, but keeper_server.enable_reconfiguration is on. "
"This update will be ignored. Use \"reconfig\" instead");
else if (diff.size() > 1)
LOG_WARNING(log, "Configuration changed for more than one server ({}) from cluster, it's strictly not recommended", diff.size());
LOG_WARNING(log,
"Configuration changed for more than one server ({}) from cluster, "
"it's strictly not recommended", diff.size());
else
LOG_DEBUG(log, "Configuration change size ({})", diff.size());
for (auto & change : diff)
{
bool push_result = update_configuration_queue.push(change);
if (!push_result)
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push configuration update to queue");
}
if (!reconfigEnabled())
for (auto & change : diff)
if (!cluster_update_queue.push(change))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push configuration update to queue");
snapshot_s3.updateS3Configuration(config, macros);
}

View File

@ -31,7 +31,7 @@ private:
using RequestsQueue = ConcurrentBoundedQueue<KeeperStorage::RequestForSession>;
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
using UpdateConfigurationQueue = ConcurrentBoundedQueue<ConfigUpdateAction>;
using ClusterUpdateQueue = ConcurrentBoundedQueue<ClusterUpdateAction>;
/// Size depends on coordination settings
std::unique_ptr<RequestsQueue> requests_queue;
@ -39,7 +39,7 @@ private:
SnapshotsQueue snapshots_queue{1};
/// More than 1k updates is definitely misconfiguration.
UpdateConfigurationQueue update_configuration_queue{1000};
ClusterUpdateQueue cluster_update_queue{1000};
std::atomic<bool> shutdown_called{false};
@ -91,8 +91,10 @@ private:
void sessionCleanerTask();
/// Thread create snapshots in the background
void snapshotThread();
/// Thread apply or wait configuration changes from leader
void updateConfigurationThread();
// TODO (myrrc) this should be removed once "reconfig" is stabilized
void clusterUpdateWithReconfigDisabledThread();
void clusterUpdateThread();
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
@ -132,10 +134,10 @@ public:
/// and achieved quorum
bool isServerActive() const;
/// Registered in ConfigReloader callback. Add new configuration changes to
/// update_configuration_queue. Keeper Dispatcher apply them asynchronously.
/// 'macros' are used to substitute macros in endpoint of disks
void updateConfiguration(const Poco::Util::AbstractConfiguration & config, const MultiVersion<Macros>::Version & macros);
void pushClusterUpdates(ClusterUpdateActions&& actions);
bool clusterUpdateQueueEmpty() const;
bool reconfigEnabled() const;
/// Shutdown internal keeper parts (server, state machine, log storage, etc)
void shutdown();

View File

@ -0,0 +1,92 @@
#include "KeeperReconfiguration.h"
#include <charconv>
#include <unordered_set>
#include <base/find_symbols.h>
#include <fmt/format.h>
namespace DB
{
ClusterUpdateActions joiningToClusterUpdates(const ClusterConfigPtr & cfg, std::string_view joining)
{
ClusterUpdateActions out;
std::unordered_set<String> endpoints;
for (const auto & server : cfg->get_servers())
endpoints.emplace(server->get_endpoint());
// We can either add new servers or change weight of existing ones.
// It makes no sense having a server in _joining_ which is identical to existing one including
// weight, so such requests are declined.
for (const RaftServerConfig & update : parseRaftServers(joining))
if (auto server_ptr = cfg->get_server(update.id))
{
if (update.endpoint != server_ptr->get_endpoint() || update.learner != server_ptr->is_learner()
|| update.priority == server_ptr->get_priority())
return {}; // can't change server endpoint/type due to NuRaft API limitations
out.emplace_back(UpdateRaftServerPriority{.id = update.id, .priority = update.priority});
}
else if (endpoints.contains(update.endpoint))
return {};
else
out.emplace_back(AddRaftServer{update});
return out;
}
ClusterUpdateActions leavingToClusterUpdates(const ClusterConfigPtr & cfg, std::string_view leaving)
{
std::vector<std::string_view> leaving_arr;
splitInto<','>(leaving_arr, leaving);
if (leaving_arr.size() >= cfg->get_servers().size())
return {};
std::unordered_set<int> remove_ids;
ClusterUpdateActions out;
for (std::string_view leaving_server : leaving_arr)
{
int id;
if (std::from_chars(leaving_server.begin(), leaving_server.end(), id).ec != std::error_code{})
return {};
if (remove_ids.contains(id))
continue;
if (auto ptr = cfg->get_server(id))
out.emplace_back(RemoveRaftServer{.id = id});
else
return {};
remove_ids.emplace(id);
}
return out;
}
String serializeClusterConfig(const ClusterConfigPtr & cfg, const ClusterUpdateActions & updates)
{
RaftServers new_config;
std::unordered_set<int> remove_update_ids;
for (const auto & update : updates)
{
if (const auto * add = std::get_if<AddRaftServer>(&update))
new_config.emplace_back(*add);
else if (const auto * remove = std::get_if<RemoveRaftServer>(&update))
remove_update_ids.insert(remove->id);
else if (const auto * priority = std::get_if<UpdateRaftServerPriority>(&update))
{
remove_update_ids.insert(priority->id);
new_config.emplace_back(RaftServerConfig{*cfg->get_server(priority->id)});
}
else
UNREACHABLE();
}
for (const auto & item : cfg->get_servers())
if (!remove_update_ids.contains(item->get_id()))
new_config.emplace_back(RaftServerConfig{*item});
return fmt::format("{}", fmt::join(new_config.begin(), new_config.end(), "\n"));
}
}

View File

@ -0,0 +1,10 @@
#pragma once
#include "Coordination/KeeperSnapshotManager.h"
#include "Coordination/RaftServerConfig.h"
namespace DB
{
ClusterUpdateActions joiningToClusterUpdates(const ClusterConfigPtr & cfg, std::string_view joining);
ClusterUpdateActions leavingToClusterUpdates(const ClusterConfigPtr & cfg, std::string_view leaving);
String serializeClusterConfig(const ClusterConfigPtr & cfg, const ClusterUpdateActions & updates = {});
}

View File

@ -27,6 +27,7 @@
#include <Common/Stopwatch.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Disks/DiskLocal.h>
#include <fmt/chrono.h>
namespace DB
{
@ -40,6 +41,8 @@ namespace ErrorCodes
extern const int INVALID_CONFIG_PARAMETER;
}
using namespace std::chrono_literals;
namespace
{
@ -118,6 +121,7 @@ KeeperServer::KeeperServer(
, is_recovering(config.getBool("keeper_server.force_recovery", false))
, keeper_context{std::move(keeper_context_)}
, create_snapshot_on_exit(config.getBool("keeper_server.create_snapshot_on_exit", true))
, enable_reconfiguration(config.getBool("keeper_server.enable_reconfiguration", false))
{
if (coordination_settings->quorum_reads)
LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower.");
@ -450,7 +454,7 @@ void KeeperServer::shutdownRaftServer()
size_t count = 0;
while (asio_service->get_active_workers() != 0 && count < timeout * 100)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::this_thread::sleep_for(10ms);
count++;
}
}
@ -715,10 +719,12 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
if (next_index < last_commited || next_index - last_commited <= 1)
commited_store = true;
auto set_initialized = [this]()
auto set_initialized = [this]
{
std::lock_guard lock(initialized_mutex);
initialized_flag = true;
{
std::lock_guard lock(initialized_mutex);
initialized_flag = true;
}
initialized_cv.notify_all();
};
@ -783,9 +789,42 @@ std::vector<int64_t> KeeperServer::getDeadSessions()
return state_machine->getDeadSessions();
}
ConfigUpdateActions KeeperServer::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config)
bool KeeperServer::applyConfigUpdate(const ClusterUpdateAction& action)
{
auto diff = state_manager->getConfigurationDiff(config);
std::lock_guard _{server_write_mutex};
if (const auto* add = std::get_if<AddRaftServer>(&action))
return raft_instance->get_srv_config(add->id) != nullptr
|| raft_instance->add_srv(static_cast<nuraft::srv_config>(*add))->get_accepted();
else if (const auto* remove = std::get_if<RemoveRaftServer>(&action))
{
if (isLeader() && remove->id == state_manager->server_id())
{
raft_instance->yield_leadership();
return false;
}
return raft_instance->get_srv_config(remove->id) == nullptr
|| raft_instance->remove_srv(remove->id)->get_accepted();
}
else if (const auto* update = std::get_if<UpdateRaftServerPriority>(&action))
{
if (auto ptr = raft_instance->get_srv_config(update->id); ptr == nullptr)
throw Exception(ErrorCodes::RAFT_ERROR,
"Attempt to apply {} but server is not present in Raft",
action);
else if (ptr->get_priority() == update->priority)
return true;
raft_instance->set_priority(update->id, update->priority, /*broadcast on live leader*/true);
return true;
}
UNREACHABLE();
}
ClusterUpdateActions KeeperServer::getRaftConfigurationDiff(const Poco::Util::AbstractConfiguration & config)
{
auto diff = state_manager->getRaftConfigurationDiff(config);
if (!diff.empty())
{
@ -796,160 +835,103 @@ ConfigUpdateActions KeeperServer::getConfigurationDiff(const Poco::Util::Abstrac
return diff;
}
void KeeperServer::applyConfigurationUpdate(const ConfigUpdateAction & task)
void KeeperServer::applyConfigUpdateWithReconfigDisabled(const ClusterUpdateAction& action)
{
std::lock_guard lock{server_write_mutex};
if (is_recovering)
return;
std::lock_guard _{server_write_mutex};
if (is_recovering) return;
constexpr auto sleep_time = 500ms;
size_t sleep_ms = 500;
if (task.action_type == ConfigUpdateActionType::AddServer)
LOG_INFO(log, "Will try to apply {}", action);
auto applied = [&] { LOG_INFO(log, "Applied {}", action); };
auto not_leader = [&] { LOG_INFO(log, "Not leader anymore, aborting"); };
auto backoff_on_refusal = [&](size_t i)
{
LOG_INFO(log, "Update was not accepted (try {}), backing off for {}", i + 1, sleep_time * (i + 1));
std::this_thread::sleep_for(sleep_time * (i + 1));
};
if (const auto* add = std::get_if<AddRaftServer>(&action))
{
LOG_INFO(log, "Will try to add server with id {}", task.server->get_id());
bool added = false;
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count && !is_recovering; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) != nullptr)
{
LOG_INFO(log, "Server with id {} was successfully added", task.server->get_id());
added = true;
break;
}
if (raft_instance->get_srv_config(add->id) != nullptr)
return applied();
if (!isLeader())
{
LOG_INFO(log, "We are not leader anymore, will not try to add server {}", task.server->get_id());
break;
}
auto result = raft_instance->add_srv(*task.server);
if (!result->get_accepted())
LOG_INFO(
log,
"Command to add server {} was not accepted for the {} time, will sleep for {} ms and retry",
task.server->get_id(),
i + 1,
sleep_ms * (i + 1));
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
return not_leader();
if (!raft_instance->add_srv(static_cast<nuraft::srv_config>(*add))->get_accepted())
backoff_on_refusal(i);
}
if (!added)
throw Exception(
ErrorCodes::RAFT_ERROR,
"Configuration change to add server (id {}) was not accepted by RAFT after all {} retries",
task.server->get_id(),
coordination_settings->configuration_change_tries_count);
}
else if (task.action_type == ConfigUpdateActionType::RemoveServer)
else if (const auto* remove = std::get_if<RemoveRaftServer>(&action))
{
LOG_INFO(log, "Will try to remove server with id {}", task.server->get_id());
bool removed = false;
if (task.server->get_id() == state_manager->server_id())
if (remove->id == state_manager->server_id())
{
LOG_INFO(
log,
"Trying to remove leader node (ourself), so will yield leadership and some other node (new leader) will try remove us. "
LOG_INFO(log,
"Trying to remove leader node (ourself), so will yield leadership and some other node "
"(new leader) will try to remove us. "
"Probably you will have to run SYSTEM RELOAD CONFIG on the new leader node");
raft_instance->yield_leadership();
return;
return raft_instance->yield_leadership();
}
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count && !is_recovering; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) == nullptr)
{
LOG_INFO(log, "Server with id {} was successfully removed", task.server->get_id());
removed = true;
break;
}
if (raft_instance->get_srv_config(remove->id) == nullptr)
return applied();
if (!isLeader())
{
LOG_INFO(log, "We are not leader anymore, will not try to remove server {}", task.server->get_id());
break;
}
auto result = raft_instance->remove_srv(task.server->get_id());
if (!result->get_accepted())
LOG_INFO(
log,
"Command to remove server {} was not accepted for the {} time, will sleep for {} ms and retry",
task.server->get_id(),
i + 1,
sleep_ms * (i + 1));
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
return not_leader();
if (!raft_instance->remove_srv(remove->id)->get_accepted())
backoff_on_refusal(i);
}
if (!removed)
throw Exception(
ErrorCodes::RAFT_ERROR,
"Configuration change to remove server (id {}) was not accepted by RAFT after all {} retries",
task.server->get_id(),
coordination_settings->configuration_change_tries_count);
}
else if (task.action_type == ConfigUpdateActionType::UpdatePriority)
raft_instance->set_priority(task.server->get_id(), task.server->get_priority());
else
LOG_WARNING(log, "Unknown configuration update type {}", static_cast<uint64_t>(task.action_type));
else if (const auto* update = std::get_if<UpdateRaftServerPriority>(&action))
{
raft_instance->set_priority(update->id, update->priority, /*broadcast on live leader*/true);
return;
}
throw Exception(ErrorCodes::RAFT_ERROR,
"Configuration change {} was not accepted by Raft after {} retries",
action, coordination_settings->configuration_change_tries_count);
}
bool KeeperServer::waitConfigurationUpdate(const ConfigUpdateAction & task)
bool KeeperServer::waitForConfigUpdateWithReconfigDisabled(const ClusterUpdateAction& action)
{
if (is_recovering)
return false;
if (is_recovering) return false;
constexpr auto sleep_time = 500ms;
size_t sleep_ms = 500;
if (task.action_type == ConfigUpdateActionType::AddServer)
LOG_INFO(log, "Will try to wait for {}", action);
auto applied = [&] { LOG_INFO(log, "Applied {}", action); return true; };
auto became_leader = [&] { LOG_INFO(log, "Became leader, aborting"); return false; };
auto backoff = [&](size_t i) { std::this_thread::sleep_for(sleep_time * (i + 1)); };
if (const auto* add = std::get_if<AddRaftServer>(&action))
{
LOG_INFO(log, "Will try to wait server with id {} to be added", task.server->get_id());
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count && !is_recovering; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) != nullptr)
{
LOG_INFO(log, "Server with id {} was successfully added by leader", task.server->get_id());
return true;
}
if (raft_instance->get_srv_config(add->id) != nullptr)
return applied();
if (isLeader())
{
LOG_INFO(log, "We are leader now, probably we will have to add server {}", task.server->get_id());
return false;
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
return became_leader();
backoff(i);
}
return false;
}
else if (task.action_type == ConfigUpdateActionType::RemoveServer)
else if (const auto* remove = std::get_if<RemoveRaftServer>(&action))
{
LOG_INFO(log, "Will try to wait remove of server with id {}", task.server->get_id());
for (size_t i = 0; i < coordination_settings->configuration_change_tries_count && !is_recovering; ++i)
{
if (raft_instance->get_srv_config(task.server->get_id()) == nullptr)
{
LOG_INFO(log, "Server with id {} was successfully removed by leader", task.server->get_id());
return true;
}
if (raft_instance->get_srv_config(remove->id) == nullptr)
return applied();
if (isLeader())
{
LOG_INFO(log, "We are leader now, probably we will have to remove server {}", task.server->get_id());
return false;
}
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms * (i + 1)));
return became_leader();
backoff(i);
}
return false;
}
else if (task.action_type == ConfigUpdateActionType::UpdatePriority)
else if (std::get_if<UpdateRaftServerPriority>(&action) != nullptr)
return true;
else
LOG_WARNING(log, "Unknown configuration update type {}", static_cast<uint64_t>(task.action_type));
return true;
return false;
}
Keeper4LWInfo KeeperServer::getPartiallyFilled4LWInfo() const

View File

@ -10,12 +10,15 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Coordination/Keeper4LWInfo.h>
#include <Coordination/KeeperContext.h>
#include <Coordination/RaftServerConfig.h>
namespace DB
{
using RaftAppendResult = nuraft::ptr<nuraft::cmd_result<nuraft::ptr<nuraft::buffer>>>;
class KeeperDispatcher;
class KeeperServer
{
private:
@ -28,9 +31,10 @@ private:
nuraft::ptr<KeeperStateManager> state_manager;
struct KeeperRaftServer;
nuraft::ptr<KeeperRaftServer> raft_instance;
nuraft::ptr<KeeperRaftServer> raft_instance; // TSA_GUARDED_BY(server_write_mutex);
nuraft::ptr<nuraft::asio_service> asio_service;
std::vector<nuraft::ptr<nuraft::rpc_listener>> asio_listeners;
// because some actions can be applied
// when we are sure that there are no requests currently being
// processed (e.g. recovery) we do all write actions
@ -65,6 +69,7 @@ private:
std::shared_ptr<KeeperContext> keeper_context;
const bool create_snapshot_on_exit;
const bool enable_reconfiguration;
public:
KeeperServer(
@ -84,6 +89,7 @@ public:
void putLocalReadRequest(const KeeperStorage::RequestForSession & request);
bool isRecovering() const { return is_recovering; }
bool reconfigEnabled() const { return enable_reconfiguration; }
/// Put batch of requests into Raft and get result of put. Responses will be set separately into
/// responses_queue.
@ -122,17 +128,12 @@ public:
int getServerID() const { return server_id; }
/// Get configuration diff between current configuration in RAFT and in XML file
ConfigUpdateActions getConfigurationDiff(const Poco::Util::AbstractConfiguration & config);
bool applyConfigUpdate(const ClusterUpdateAction& action);
/// Apply action for configuration update. Actually call raft_instance->remove_srv or raft_instance->add_srv.
/// Synchronously check for update results with retries.
void applyConfigurationUpdate(const ConfigUpdateAction & task);
/// Wait configuration update for action. Used by followers.
/// Return true if update was successfully received.
bool waitConfigurationUpdate(const ConfigUpdateAction & task);
// TODO (myrrc) these functions should be removed once "reconfig" is stabilized
void applyConfigUpdateWithReconfigDisabled(const ClusterUpdateAction& action);
bool waitForConfigUpdateWithReconfigDisabled(const ClusterUpdateAction& action);
ClusterUpdateActions getRaftConfigurationDiff(const Poco::Util::AbstractConfiguration & config);
uint64_t createSnapshot();

View File

@ -2,17 +2,20 @@
#include <future>
#include <Coordination/KeeperSnapshotManager.h>
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/KeeperDispatcher.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <IO/ReadHelpers.h>
#include <base/defines.h>
#include <base/errnoToString.h>
#include <base/move_extend.h>
#include <sys/mman.h>
#include <Common/ProfileEvents.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/logger_useful.h>
#include "Coordination/KeeperStorage.h"
#include "Coordination/KeeperReconfiguration.h"
#include <Disks/DiskLocal.h>
@ -146,7 +149,7 @@ void assertDigest(
"Digest for nodes is not matching after {} request of type '{}'.\nExpected digest - {}, actual digest - {} (digest "
"{}). Keeper will terminate to avoid inconsistencies.\nExtra information about the request:\n{}",
committing ? "committing" : "preprocessing",
Coordination::toString(request.getOpNum()),
request.getOpNum(),
first.value,
second.value,
first.version,
@ -261,7 +264,8 @@ std::shared_ptr<KeeperStorage::RequestForSession> KeeperStateMachine::parseReque
bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & request_for_session)
{
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
const auto op_num = request_for_session.request->getOpNum();
if (op_num == Coordination::OpNum::SessionID || op_num == Coordination::OpNum::Reconfig)
return true;
std::lock_guard lock(storage_and_responses_lock);
@ -291,14 +295,89 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
return true;
}
KeeperStorage::ResponseForSession KeeperStateMachine::processReconfiguration(
const KeeperStorage::RequestForSession& request_for_session)
{
const auto& request = static_cast<const Coordination::ZooKeeperReconfigRequest&>(*request_for_session.request);
const int64_t session_id = request_for_session.session_id;
const int64_t zxid = request_for_session.zxid;
using enum Coordination::Error;
auto bad_request = [&](Coordination::Error code = ZBADARGUMENTS) -> KeeperStorage::ResponseForSession
{
auto res = std::make_shared<Coordination::ZooKeeperReconfigResponse>();
res->xid = request.xid;
res->zxid = zxid;
res->error = code;
return { session_id, std::move(res) };
};
KeeperDispatcher& dispatcher = *keeper_context->dispatcher;
if (!dispatcher.reconfigEnabled())
return bad_request(ZUNIMPLEMENTED);
if (!dispatcher.clusterUpdateQueueEmpty())
return bad_request(ZRECONFIGINPROGRESS);
if (request.version != -1)
return bad_request(ZBADVERSION);
const bool has_new_members = !request.new_members.empty();
const bool has_joining = !request.joining.empty();
const bool has_leaving = !request.leaving.empty();
const bool incremental_reconfig = (has_joining || has_leaving) && !has_new_members;
if (!incremental_reconfig)
return bad_request();
const ClusterConfigPtr config = getClusterConfig();
if (!config) // Server can be uninitialized yet
return bad_request();
ClusterUpdateActions updates;
if (has_joining)
{
if (auto join_updates = joiningToClusterUpdates(config, request.joining); !join_updates.empty())
moveExtend(updates, std::move(join_updates));
else
return bad_request();
}
if (has_leaving)
{
if (auto leave_updates = leavingToClusterUpdates(config, request.leaving); !leave_updates.empty())
moveExtend(updates, std::move(leave_updates));
else
return bad_request();
}
auto response = std::make_shared<Coordination::ZooKeeperReconfigResponse>();
response->xid = request.xid;
response->zxid = zxid;
response->error = Coordination::Error::ZOK;
response->value = serializeClusterConfig(config, updates);
dispatcher.pushClusterUpdates(std::move(updates));
return { session_id, std::move(response) };
}
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)
{
auto request_for_session = parseRequest(data, true);
if (!request_for_session->zxid)
request_for_session->zxid = log_idx;
/// Special processing of session_id request
if (request_for_session->request->getOpNum() == Coordination::OpNum::SessionID)
auto try_push = [this](const KeeperStorage::ResponseForSession& response)
{
if (!responses_queue.push(response))
{
ProfileEvents::increment(ProfileEvents::KeeperCommitsFailed);
LOG_WARNING(log,
"Failed to push response with session id {} to the queue, probably because of shutdown",
response.session_id);
}
};
const auto op_num = request_for_session->request->getOpNum();
if (op_num == Coordination::OpNum::SessionID)
{
const Coordination::ZooKeeperSessionIDRequest & session_id_request
= dynamic_cast<const Coordination::ZooKeeperSessionIDRequest &>(*request_for_session->request);
@ -309,21 +388,24 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
KeeperStorage::ResponseForSession response_for_session;
response_for_session.session_id = -1;
response_for_session.response = response;
{
std::lock_guard lock(storage_and_responses_lock);
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
response->session_id = session_id;
if (!responses_queue.push(response_for_session))
{
ProfileEvents::increment(ProfileEvents::KeeperCommitsFailed);
LOG_WARNING(log, "Failed to push response with session id {} to the queue, probably because of shutdown", session_id);
}
}
std::lock_guard lock(storage_and_responses_lock);
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
response->session_id = session_id;
try_push(response_for_session);
}
// Processing reconfig request as an ordinary one (in KeeperStorage) brings multiple inconsistencies
// regarding replays of old reconfigurations in new nodes. Thus the storage is not involved.
// See https://github.com/ClickHouse/ClickHouse/pull/49450 for details
else if (op_num == Coordination::OpNum::Reconfig)
{
std::lock_guard lock(storage_and_responses_lock);
try_push(processReconfiguration(*request_for_session));
}
else
{
if (request_for_session->request->getOpNum() == Coordination::OpNum::Close)
if (op_num == Coordination::OpNum::Close)
{
std::lock_guard lock(request_cache_mutex);
parsed_request_cache.erase(request_for_session->session_id);
@ -333,14 +415,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
KeeperStorage::ResponsesForSessions responses_for_sessions
= storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid);
for (auto & response_for_session : responses_for_sessions)
if (!responses_queue.push(response_for_session))
{
ProfileEvents::increment(ProfileEvents::KeeperCommitsFailed);
LOG_WARNING(
log,
"Failed to push response with session id {} to the queue, probably because of shutdown",
response_for_session.session_id);
}
try_push(response_for_session);
if (keeper_context->digestEnabled() && request_for_session->digest)
assertDigest(*request_for_session->digest, storage->getNodesDigest(true), *request_for_session->request, true);
@ -782,5 +857,4 @@ void KeeperStateMachine::recalculateStorageStats()
storage->recalculateStats();
LOG_INFO(log, "Done recalculating storage stats");
}
}

View File

@ -12,6 +12,7 @@
namespace DB
{
class KeeperDispatcher;
using ResponsesQueue = ConcurrentBoundedQueue<KeeperStorage::ResponseForSession>;
using SnapshotsQueue = ConcurrentBoundedQueue<CreateSnapshotTask>;
@ -67,7 +68,9 @@ public:
// (can happen in case of exception during preprocessing)
void rollbackRequest(const KeeperStorage::RequestForSession & request_for_session, bool allow_missing);
void rollbackRequestNoLock(const KeeperStorage::RequestForSession & request_for_session, bool allow_missing);
void rollbackRequestNoLock(
const KeeperStorage::RequestForSession & request_for_session,
bool allow_missing) TSA_NO_THREAD_SAFETY_ANALYSIS;
uint64_t last_commit_index() override { return last_committed_idx; }
@ -87,8 +90,10 @@ public:
int read_logical_snp_obj(
nuraft::snapshot & s, void *& user_snp_ctx, uint64_t obj_id, nuraft::ptr<nuraft::buffer> & data_out, bool & is_last_obj) override;
/// just for test
KeeperStorage & getStorage() { return *storage; }
KeeperStorage & getStorageForUnitTests() TSA_NO_THREAD_SAFETY_ANALYSIS
{
return *storage;
}
void shutdownStorage();
@ -122,6 +127,7 @@ public:
uint64_t getLatestSnapshotBufSize() const;
void recalculateStorageStats();
private:
CommitCallback commit_callback;
/// In our state machine we always have a single snapshot which is stored
@ -133,7 +139,7 @@ private:
CoordinationSettingsPtr coordination_settings;
/// Main state machine logic
KeeperStoragePtr storage;
KeeperStoragePtr storage TSA_PT_GUARDED_BY(storage_and_responses_lock);
/// Save/Load and Serialize/Deserialize logic for snapshots.
KeeperSnapshotManager snapshot_manager;
@ -178,6 +184,8 @@ private:
KeeperContextPtr keeper_context;
KeeperSnapshotManagerS3 * snapshot_manager_s3;
KeeperStorage::ResponseForSession processReconfiguration(const KeeperStorage::RequestForSession& request_for_session);
};
}

View File

@ -451,7 +451,7 @@ nuraft::ptr<nuraft::srv_state> KeeperStateManager::read_state()
return nullptr;
}
ConfigUpdateActions KeeperStateManager::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const
ClusterUpdateActions KeeperStateManager::getRaftConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const
{
auto new_configuration_wrapper = parseServersConfiguration(config, true);
@ -465,14 +465,14 @@ ConfigUpdateActions KeeperStateManager::getConfigurationDiff(const Poco::Util::A
old_ids[old_server->get_id()] = old_server;
}
ConfigUpdateActions result;
ClusterUpdateActions result;
/// First of all add new servers
for (const auto & [new_id, server_config] : new_ids)
{
auto old_server_it = old_ids.find(new_id);
if (old_server_it == old_ids.end())
result.emplace_back(ConfigUpdateAction{ConfigUpdateActionType::AddServer, server_config});
result.emplace_back(AddRaftServer{RaftServerConfig{*server_config}});
else
{
const auto & old_endpoint = old_server_it->second->get_endpoint();
@ -491,10 +491,8 @@ ConfigUpdateActions KeeperStateManager::getConfigurationDiff(const Poco::Util::A
/// After that remove old ones
for (auto [old_id, server_config] : old_ids)
{
if (!new_ids.contains(old_id))
result.emplace_back(ConfigUpdateAction{ConfigUpdateActionType::RemoveServer, server_config});
}
result.emplace_back(RemoveRaftServer{old_id});
{
std::lock_guard lock(configuration_wrapper_mutex);
@ -507,7 +505,10 @@ ConfigUpdateActions KeeperStateManager::getConfigurationDiff(const Poco::Util::A
{
if (old_server->get_priority() != new_server->get_priority())
{
result.emplace_back(ConfigUpdateAction{ConfigUpdateActionType::UpdatePriority, new_server});
result.emplace_back(UpdateRaftServerPriority{
.id = new_server->get_id(),
.priority = new_server->get_priority()
});
}
break;
}

View File

@ -7,31 +7,13 @@
#include <libnuraft/nuraft.hxx>
#include <Poco/Util/AbstractConfiguration.h>
#include "Coordination/KeeperStateMachine.h"
#include "Coordination/RaftServerConfig.h"
#include <Coordination/KeeperSnapshotManager.h>
namespace DB
{
using KeeperServerConfigPtr = nuraft::ptr<nuraft::srv_config>;
/// When our configuration changes the following action types
/// can happen
enum class ConfigUpdateActionType
{
RemoveServer,
AddServer,
UpdatePriority,
};
/// Action to update configuration
struct ConfigUpdateAction
{
ConfigUpdateActionType action_type;
KeeperServerConfigPtr server;
};
using ConfigUpdateActions = std::vector<ConfigUpdateAction>;
/// Responsible for managing our and cluster configuration
class KeeperStateManager : public nuraft::state_mgr
{
@ -74,7 +56,11 @@ public:
int32_t server_id() override { return my_server_id; }
nuraft::ptr<nuraft::srv_config> get_srv_config() const { return configuration_wrapper.config; } /// NOLINT
nuraft::ptr<nuraft::srv_config> get_srv_config() const
{
std::lock_guard lk(configuration_wrapper_mutex);
return configuration_wrapper.config;
}
void system_exit(const int exit_code) override; /// NOLINT
@ -106,8 +92,8 @@ public:
/// Read all log entries in log store from the begging and return latest config (with largest log_index)
ClusterConfigPtr getLatestConfigFromLogStore() const;
/// Get configuration diff between proposed XML and current state in RAFT
ConfigUpdateActions getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const;
// TODO (myrrc) This should be removed once "reconfig" is stabilized
ClusterUpdateActions getRaftConfigurationDiff(const Poco::Util::AbstractConfiguration & config) const;
private:
const String & getOldServerStatePath();
@ -133,7 +119,7 @@ private:
std::string config_prefix;
mutable std::mutex configuration_wrapper_mutex;
KeeperConfigurationWrapper configuration_wrapper;
KeeperConfigurationWrapper configuration_wrapper TSA_GUARDED_BY(configuration_wrapper_mutex);
nuraft::ptr<KeeperLogStore> log_store;

View File

@ -20,10 +20,10 @@
#include <Coordination/pathUtils.h>
#include <Coordination/KeeperConstants.h>
#include <Coordination/KeeperReconfiguration.h>
#include <Coordination/KeeperStorage.h>
#include <Coordination/KeeperDispatcher.h>
#include <sstream>
#include <iomanip>
#include <mutex>
#include <functional>
#include <base/defines.h>
@ -53,7 +53,6 @@ namespace ErrorCodes
namespace
{
String getSHA1(const String & userdata)
{
Poco::SHA1Engine engine;
@ -1060,7 +1059,8 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
ProfileEvents::increment(ProfileEvents::KeeperGetRequest);
Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request);
if (request.path == Coordination::keeper_api_feature_flags_path)
if (request.path == Coordination::keeper_api_feature_flags_path
|| request.path == Coordination::keeper_config_path)
return {};
if (!storage.uncommitted_state.getNode(request.path))
@ -1085,6 +1085,14 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
}
}
if (request.path == Coordination::keeper_config_path)
{
response.data = serializeClusterConfig(
storage.keeper_context->dispatcher->getStateMachine().getClusterConfig());
response.error = Coordination::Error::ZOK;
return response_ptr;
}
auto & container = storage.container;
auto node_it = container.find(request.path);
if (node_it == container.end())
@ -1784,7 +1792,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
throw DB::Exception(
ErrorCodes::BAD_ARGUMENTS,
"Illegal command as part of multi ZooKeeper request {}",
Coordination::toString(sub_zk_request->getOpNum()));
sub_zk_request->getOpNum());
}
}
@ -1975,7 +1983,7 @@ public:
{
auto request_it = op_num_to_request.find(zk_request->getOpNum());
if (request_it == op_num_to_request.end())
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unknown operation type {}", toString(zk_request->getOpNum()));
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unknown operation type {}", zk_request->getOpNum());
return request_it->second(zk_request);
}

View File

@ -0,0 +1,96 @@
#include "RaftServerConfig.h"
#include <charconv>
#include <system_error>
#include <unordered_set>
#include <base/find_symbols.h>
namespace DB
{
RaftServerConfig::RaftServerConfig(const nuraft::srv_config & cfg) noexcept
: id(cfg.get_id()), endpoint(cfg.get_endpoint()), learner(cfg.is_learner()), priority(cfg.get_priority())
{
}
RaftServerConfig::operator nuraft::srv_config() const noexcept
{
return {id, 0, endpoint, "", learner, priority};
}
std::optional<RaftServerConfig> RaftServerConfig::parse(std::string_view server) noexcept
{
std::vector<std::string_view> parts;
splitInto<';', '='>(parts, server);
const bool with_id_endpoint = parts.size() == 2;
const bool with_server_type = parts.size() == 3;
const bool with_priority = parts.size() == 4;
if (!with_id_endpoint && !with_server_type && !with_priority)
return std::nullopt;
const std::string_view id_str = parts[0];
if (!id_str.starts_with("server."))
return std::nullopt;
int id;
if (std::from_chars(std::next(id_str.begin(), 7), id_str.end(), id).ec != std::error_code{})
return std::nullopt;
if (id <= 0)
return std::nullopt;
const std::string_view endpoint = parts[1];
const size_t port_delimiter = endpoint.find_last_of(':');
if (port_delimiter == std::string::npos)
return {};
const std::string_view port = endpoint.substr(port_delimiter + 1);
uint16_t port_tmp;
if (std::from_chars(port.begin(), port.end(), port_tmp).ec != std::error_code{})
return std::nullopt;
RaftServerConfig out{id, endpoint};
if (with_id_endpoint)
return out;
if (parts[2] != "learner" && parts[2] != "participant")
return std::nullopt;
out.learner = parts[2] == "learner";
if (with_server_type)
return out;
const std::string_view priority = parts[3];
if (std::from_chars(priority.begin(), priority.end(), out.priority).ec != std::error_code{})
return std::nullopt;
if (out.priority < 0)
return std::nullopt;
return out;
}
RaftServers parseRaftServers(std::string_view servers)
{
std::vector<std::string_view> server_arr;
std::unordered_set<int> ids;
std::unordered_set<String> endpoints;
RaftServers out;
for (auto & server : splitInto<','>(server_arr, servers))
if (auto maybe_server = RaftServerConfig::parse(server))
{
String endpoint = maybe_server->endpoint;
if (endpoints.contains(endpoint))
return {};
const int id = maybe_server->id;
if (ids.contains(id))
return {};
out.emplace_back(std::move(*maybe_server));
endpoints.emplace(std::move(endpoint));
ids.emplace(id);
}
else
return {};
return out;
}
}

View File

@ -0,0 +1,78 @@
#pragma once
#include <base/defines.h>
#include <base/types.h>
#include <fmt/core.h>
#include <libnuraft/srv_config.hxx>
namespace DB
{
// default- and copy-constructible version of nuraft::srv_config
struct RaftServerConfig
{
int id;
String endpoint;
bool learner;
int priority;
constexpr RaftServerConfig() = default;
constexpr RaftServerConfig(int id_, std::string_view endpoint_, bool learner_ = false, int priority_ = 1)
: id(id_), endpoint(endpoint_), learner(learner_), priority(priority_)
{
}
constexpr bool operator==(const RaftServerConfig &) const = default;
explicit RaftServerConfig(const nuraft::srv_config & cfg) noexcept;
explicit operator nuraft::srv_config() const noexcept;
/// Parse server in format "server.id=host:port[;learner][;priority]"
static std::optional<RaftServerConfig> parse(std::string_view server) noexcept;
};
using RaftServers = std::vector<RaftServerConfig>;
/// Parse comma-delimited servers. Check for duplicate endpoints and ids.
/// @returns {} on parsing or validation error.
RaftServers parseRaftServers(std::string_view servers);
struct AddRaftServer : RaftServerConfig
{
};
struct RemoveRaftServer
{
int id;
};
struct UpdateRaftServerPriority
{
int id;
int priority;
};
using ClusterUpdateAction = std::variant<AddRaftServer, RemoveRaftServer, UpdateRaftServerPriority>;
using ClusterUpdateActions = std::vector<ClusterUpdateAction>;
}
template <>
struct fmt::formatter<DB::RaftServerConfig> : fmt::formatter<string_view>
{
constexpr auto format(const DB::RaftServerConfig & server, format_context & ctx)
{
return fmt::format_to(
ctx.out(), "server.{}={};{};{}", server.id, server.endpoint, server.learner ? "learner" : "participant", server.priority);
}
};
template <>
struct fmt::formatter<DB::ClusterUpdateAction> : fmt::formatter<string_view>
{
constexpr auto format(const DB::ClusterUpdateAction & action, format_context & ctx)
{
if (const auto * add = std::get_if<DB::AddRaftServer>(&action))
return fmt::format_to(ctx.out(), "(Add server {})", add->id);
if (const auto * remove = std::get_if<DB::RemoveRaftServer>(&action))
return fmt::format_to(ctx.out(), "(Remove server {})", remove->id);
if (const auto * update = std::get_if<DB::UpdateRaftServerPriority>(&action))
return fmt::format_to(ctx.out(), "(Change server {} priority to {})", update->id, update->priority);
UNREACHABLE();
}
};

View File

@ -84,6 +84,47 @@ protected:
}
};
TEST_P(CoordinationTest, RaftServerConfigParse)
{
auto parse = Coordination::RaftServerConfig::parse;
using Cfg = std::optional<DB::RaftServerConfig>;
EXPECT_EQ(parse(""), std::nullopt);
EXPECT_EQ(parse("="), std::nullopt);
EXPECT_EQ(parse("=;"), std::nullopt);
EXPECT_EQ(parse("=;;"), std::nullopt);
EXPECT_EQ(parse("=:80"), std::nullopt);
EXPECT_EQ(parse("server."), std::nullopt);
EXPECT_EQ(parse("server.=:80"), std::nullopt);
EXPECT_EQ(parse("server.-5=1:2"), std::nullopt);
EXPECT_EQ(parse("server.1=host;-123"), std::nullopt);
EXPECT_EQ(parse("server.1=host:999"), (Cfg{{1, "host:999"}}));
EXPECT_EQ(parse("server.1=host:999;learner"), (Cfg{{1, "host:999", true}}));
EXPECT_EQ(parse("server.1=host:999;participant"), (Cfg{{1, "host:999", false}}));
EXPECT_EQ(parse("server.1=host:999;learner;25"), (Cfg{{1, "host:999", true, 25}}));
EXPECT_EQ(parse("server.1=127.0.0.1:80"), (Cfg{{1, "127.0.0.1:80"}}));
EXPECT_EQ(
parse("server.1=2001:0db8:85a3:0000:0000:8a2e:0370:7334:80"),
(Cfg{{1, "2001:0db8:85a3:0000:0000:8a2e:0370:7334:80"}}));
}
TEST_P(CoordinationTest, RaftServerClusterConfigParse)
{
auto parse = Coordination::parseRaftServers;
using Cfg = DB::RaftServerConfig;
using Servers = DB::RaftServers;
EXPECT_EQ(parse(""), Servers{});
EXPECT_EQ(parse(","), Servers{});
EXPECT_EQ(parse("1,2"), Servers{});
EXPECT_EQ(parse("server.1=host:80,server.1=host2:80"), Servers{});
EXPECT_EQ(parse("server.1=host:80,server.2=host:80"), Servers{});
EXPECT_EQ(
parse("server.1=host:80,server.2=host:81"),
(Servers{Cfg{1, "host:80"}, Cfg{2, "host:81"}}));
}
TEST_P(CoordinationTest, BuildTest)
{
DB::InMemoryLogStore store;
@ -1575,8 +1616,8 @@ void testLogAndStateMachine(
restore_machine->commit(i, changelog.entry_at(i)->get_buf());
}
auto & source_storage = state_machine->getStorage();
auto & restored_storage = restore_machine->getStorage();
auto & source_storage = state_machine->getStorageForUnitTests();
auto & restored_storage = restore_machine->getStorageForUnitTests();
EXPECT_EQ(source_storage.container.size(), restored_storage.container.size());
for (size_t i = 1; i < total_logs + 1; ++i)
@ -1678,7 +1719,7 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove)
auto entry_c = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), request_c);
state_machine->pre_commit(1, entry_c->get_buf());
state_machine->commit(1, entry_c->get_buf());
const auto & storage = state_machine->getStorage();
const auto & storage = state_machine->getStorageForUnitTests();
EXPECT_EQ(storage.ephemerals.size(), 1);
std::shared_ptr<ZooKeeperRemoveRequest> request_d = std::make_shared<ZooKeeperRemoveRequest>();
@ -1727,7 +1768,7 @@ TEST_P(CoordinationTest, TestCreateNodeWithAuthSchemeForAclWhenAuthIsPrecommitte
auto create_entry = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), create_req);
state_machine->pre_commit(2, create_entry->get_buf());
const auto & uncommitted_state = state_machine->getStorage().uncommitted_state;
const auto & uncommitted_state = state_machine->getStorageForUnitTests().uncommitted_state;
ASSERT_TRUE(uncommitted_state.nodes.contains(node_path));
// commit log entries
@ -1790,7 +1831,7 @@ TEST_P(CoordinationTest, TestSetACLWithAuthSchemeForAclWhenAuthIsPrecommitted)
state_machine->commit(2, create_entry->get_buf());
state_machine->commit(3, set_acl_entry->get_buf());
const auto & uncommitted_state = state_machine->getStorage().uncommitted_state;
const auto & uncommitted_state = state_machine->getStorageForUnitTests().uncommitted_state;
auto node = uncommitted_state.getNode(node_path);
ASSERT_NE(node, nullptr);

View File

@ -73,6 +73,7 @@ NamesAndTypesList ZooKeeperLogElement::getNamesAndTypes()
{"Create", static_cast<Int16>(Coordination::OpNum::Create)},
{"Remove", static_cast<Int16>(Coordination::OpNum::Remove)},
{"Exists", static_cast<Int16>(Coordination::OpNum::Exists)},
{"Reconfig", static_cast<Int16>(Coordination::OpNum::Reconfig)},
{"Get", static_cast<Int16>(Coordination::OpNum::Get)},
{"Set", static_cast<Int16>(Coordination::OpNum::Set)},
{"GetACL", static_cast<Int16>(Coordination::OpNum::GetACL)},

View File

@ -67,7 +67,8 @@ struct HudiMetadataParser<Configuration, MetadataReadHelper>::Impl
{
auto key_file = std::filesystem::path(key);
Strings file_parts;
splitInto<'_'>(file_parts, key_file.stem());
const String stem = key_file.stem();
splitInto<'_'>(file_parts, stem);
if (file_parts.size() != 3)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected format for file: {}", key);

View File

@ -492,7 +492,7 @@ size_t ReplicatedMergeTreeCleanupThread::clearOldBlocks(const String & blocks_di
}
else
{
LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, Coordination::errorMessage(rc));
LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, rc);
}
first_outdated_block++;
}

View File

@ -494,7 +494,7 @@ void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
if (code != Coordination::Error::ZOK)
LOG_ERROR(log, "Couldn't set value of nodes for insert times "
"({}/min_unprocessed_insert_time, max_processed_insert_time): {}. "
"This shouldn't happen often.", replica_path, Coordination::errorMessage(code));
"This shouldn't happen often.", replica_path, code);
}
}
@ -551,7 +551,7 @@ void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeep
auto code = zookeeper->tryRemove(fs::path(replica_path) / "queue" / entry->znode_name);
if (code != Coordination::Error::ZOK)
LOG_ERROR(log, "Couldn't remove {}/queue/{}: {}. This shouldn't happen often.", replica_path, entry->znode_name, Coordination::errorMessage(code));
LOG_ERROR(log, "Couldn't remove {}/queue/{}: {}. This shouldn't happen often.", replica_path, entry->znode_name, code);
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
}
@ -1144,7 +1144,7 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
auto code = zookeeper->tryRemove(fs::path(replica_path) / "queue" / znode_name);
if (code != Coordination::Error::ZOK)
LOG_INFO(log, "Couldn't remove {}: {}", (fs::path(replica_path) / "queue" / znode_name).string(), Coordination::errorMessage(code));
LOG_INFO(log, "Couldn't remove {}: {}", (fs::path(replica_path) / "queue" / znode_name).string(), code);
updateStateOnQueueEntryRemoval(
*it, /* is_successful = */ false,

View File

@ -723,7 +723,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
retries_ctl.setUserError(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Insert failed due to zookeeper error. Please retry. Reason: {}",
Coordination::errorMessage(write_part_info_keeper_error));
write_part_info_keeper_error);
}
retries_ctl.stopRetries();
@ -1033,7 +1033,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
retries_ctl.setUserError(
ErrorCodes::UNKNOWN_STATUS_OF_INSERT,
"Unknown status, client must retry. Reason: {}",
Coordination::errorMessage(multi_code));
multi_code);
return;
}
else if (Coordination::isUserError(multi_code))
@ -1109,7 +1109,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
"Unexpected logical error while adding block {} with ID '{}': {}, path {}",
block_number,
toString(block_id),
Coordination::errorMessage(multi_code),
multi_code,
failed_op_path);
}
}
@ -1122,7 +1122,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
"Unexpected ZooKeeper error while adding block {} with ID '{}': {}",
block_number,
toString(block_id),
Coordination::errorMessage(multi_code));
multi_code);
}
},
[&zookeeper]() { zookeeper->cleanupEphemeralNodes(); });

View File

@ -1037,7 +1037,7 @@ void StorageReplicatedMergeTree::dropReplica(zkutil::ZooKeeperPtr zookeeper, con
code = zookeeper->tryMulti(ops, res);
if (code != Coordination::Error::ZOK)
LOG_WARNING(logger, "Cannot quickly remove nodes without children: {} (replica: {}). Will remove recursively.",
Coordination::errorMessage(code), remote_replica_path);
code, remote_replica_path);
/// And finally remove everything else recursively
/// It may left some garbage if replica_path subtree is concurrently modified
@ -1145,7 +1145,7 @@ bool StorageReplicatedMergeTree::removeTableNodesFromZooKeeper(zkutil::ZooKeeper
auto code = zookeeper->tryMulti(ops, res);
if (code != Coordination::Error::ZOK)
LOG_WARNING(logger, "Cannot quickly remove nodes without children: {} (table: {}). Will remove recursively.",
Coordination::errorMessage(code), zookeeper_path);
code, zookeeper_path);
Strings children;
code = zookeeper->tryGetChildren(zookeeper_path, children);
@ -1893,7 +1893,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry, bool need_to_che
else if (code == Coordination::Error::ZBADVERSION || code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
{
LOG_DEBUG(log, "State was changed or isn't expected when trying to mark quorum for part {} as failed. Code: {}",
entry.new_part_name, Coordination::errorMessage(code));
entry.new_part_name, code);
}
else
throw Coordination::Exception(code);
@ -3098,7 +3098,7 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke
if (get_is_lost.error != Coordination::Error::ZOK)
{
LOG_INFO(log, "Not cloning {}, cannot get '/is_lost': {}", source_replica_name, Coordination::errorMessage(get_is_lost.error));
LOG_INFO(log, "Not cloning {}, cannot get '/is_lost': {}", source_replica_name, get_is_lost.error);
continue;
}
else if (get_is_lost.data != "0")
@ -3109,12 +3109,12 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke
if (get_log_pointer.error != Coordination::Error::ZOK)
{
LOG_INFO(log, "Not cloning {}, cannot get '/log_pointer': {}", source_replica_name, Coordination::errorMessage(get_log_pointer.error));
LOG_INFO(log, "Not cloning {}, cannot get '/log_pointer': {}", source_replica_name, get_log_pointer.error);
continue;
}
if (get_queue.error != Coordination::Error::ZOK)
{
LOG_INFO(log, "Not cloning {}, cannot get '/queue': {}", source_replica_name, Coordination::errorMessage(get_queue.error));
LOG_INFO(log, "Not cloning {}, cannot get '/queue': {}", source_replica_name, get_queue.error);
continue;
}
@ -7203,7 +7203,7 @@ void StorageReplicatedMergeTree::clearBlocksInPartition(
{
for (size_t i = 0; i < delete_requests.size(); ++i)
if (delete_responses[i]->error != Coordination::Error::ZOK)
LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", delete_requests[i]->getPath(), Coordination::errorMessage(delete_responses[i]->error));
LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", delete_requests[i]->getPath(), delete_responses[i]->error);
}
LOG_TRACE(log, "Deleted {} deduplication block IDs in partition ID {}", delete_requests.size(), partition_id);
@ -8717,7 +8717,7 @@ std::pair<bool, std::optional<NameSet>> getParentLockedBlobs(const ZooKeeperWith
zookeeper_ptr->tryGet(fs::path(zero_copy_part_path_prefix) / part_candidate_info_str, files_not_to_remove_str, nullptr, nullptr, &code);
if (code != Coordination::Error::ZOK)
{
LOG_TRACE(log, "Cannot get parent files from ZooKeeper on path ({}), error {}", (fs::path(zero_copy_part_path_prefix) / part_candidate_info_str).string(), errorMessage(code));
LOG_TRACE(log, "Cannot get parent files from ZooKeeper on path ({}), error {}", (fs::path(zero_copy_part_path_prefix) / part_candidate_info_str).string(), code);
return {true, std::nullopt};
}

View File

@ -1,5 +1,6 @@
import socket
import time
from kazoo.client import KazooClient
def get_keeper_socket(cluster, node, port=9181):
@ -26,9 +27,17 @@ def send_4lw_cmd(cluster, node, cmd="ruok", port=9181):
NOT_SERVING_REQUESTS_ERROR_MSG = "This instance is not currently serving requests"
def wait_until_connected(cluster, node, port=9181):
def wait_until_connected(cluster, node, port=9181, timeout=30.0):
elapsed = 0.0
while send_4lw_cmd(cluster, node, "mntr", port) == NOT_SERVING_REQUESTS_ERROR_MSG:
time.sleep(0.1)
elapsed += 0.1
if elapsed >= timeout:
raise Exception(
f"{timeout}s timeout while waiting for {node.name} to start serving requests"
)
def wait_until_quorum_lost(cluster, node, port=9181):
@ -51,3 +60,25 @@ def get_leader(cluster, nodes):
if is_leader(cluster, node):
return node
raise Exception("No leader in Keeper cluster.")
def get_fake_zk(cluster, node, timeout: float = 30.0) -> KazooClient:
_fake = KazooClient(
hosts=cluster.get_instance_ip(node.name) + ":9181", timeout=timeout
)
_fake.start()
return _fake
def get_config_str(zk: KazooClient) -> str:
"""
Return decoded contents of /keeper/config node
"""
return zk.get("/keeper/config")[0].decode("utf-8")
def configs_equal(left: str, right: str) -> bool:
"""
Check whether /keeper/config nodes are equal
"""
return sorted(left.split("\n")) == sorted(right.split("\n"))

View File

@ -1,12 +1,7 @@
#!/usr/bin/env python3
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from multiprocessing.dummy import Pool

View File

@ -0,0 +1,20 @@
<clickhouse>
<keeper_server>
<enable_reconfiguration>true</enable_reconfiguration>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server> <id>1</id> <hostname>node1</hostname> <port>9234</port> </server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,21 @@
<clickhouse>
<keeper_server>
<enable_reconfiguration>true</enable_reconfiguration>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server> <id>1</id> <hostname>node1</hostname> <port>9234</port> </server>
<server> <id>2</id> <hostname>node2</hostname> <port>9234</port> </server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,22 @@
<clickhouse>
<keeper_server>
<enable_reconfiguration>true</enable_reconfiguration>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server> <id>1</id> <hostname>node1</hostname> <port>9234</port> </server>
<server> <id>2</id> <hostname>node2</hostname> <port>9234</port> </server>
<server> <id>3</id> <hostname>node3</hostname> <port>9234</port> </server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,155 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as ku
import os
from kazoo.client import KazooClient
from kazoo.exceptions import BadArgumentsException
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
node1 = cluster.add_instance("node1", main_configs=["configs/keeper1.xml"])
node2 = cluster.add_instance("node2", stay_alive=True)
node3 = cluster.add_instance("node3", stay_alive=True)
server_join_msg = "confirms it will join"
part_of_cluster = "now this node is the part of cluster"
zk1, zk2, zk3 = None, None, None
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node2.stop_clickhouse()
node2.copy_file_to_container(
os.path.join(CONFIG_DIR, "keeper2.xml"),
"/etc/clickhouse-server/config.d/keeper.xml",
)
node3.stop_clickhouse()
node3.copy_file_to_container(
os.path.join(CONFIG_DIR, "keeper3.xml"),
"/etc/clickhouse-server/config.d/keeper.xml",
)
yield cluster
finally:
for conn in [zk1, zk2, zk3]:
if conn:
conn.stop()
conn.close()
cluster.shutdown()
def test_reconfig_add(started_cluster):
"""
Add a node to another node. Then add another node to two.
"""
zk1 = get_fake_zk(node1)
config = ku.get_config_str(zk1)
print("Initial config", config)
assert len(config.split("\n")) == 1
assert "node1" in config
assert "node2" not in config
assert "node3" not in config
with pytest.raises(BadArgumentsException):
# duplicate id with different endpoint
zk1.reconfig(joining="server.1=localhost:1337", leaving=None, new_members=None)
with pytest.raises(BadArgumentsException):
# duplicate endpoint
zk1.reconfig(joining="server.8=node1:9234", leaving=None, new_members=None)
for i in range(100):
zk1.create(f"/test_three_{i}", b"somedata")
node2.start_clickhouse()
config, _ = zk1.reconfig(
joining="server.2=node2:9234", leaving=None, new_members=None
)
ku.wait_until_connected(cluster, node2)
config = config.decode("utf-8")
print("After adding 2", config)
assert len(config.split("\n")) == 2
assert "node1" in config
assert "node2" in config
assert "node3" not in config
zk2 = get_fake_zk(node2)
assert ku.configs_equal(config, ku.get_config_str(zk2))
for i in range(100):
assert zk2.exists(f"/test_three_{i}") is not None
zk2.create(f"/test_three_{100 + i}", b"somedata")
# Why not both?
# One node will process add_srv request, other will pull out updated config, apply
# and return true in config update thread (without calling add_srv again)
assert node1.contains_in_log(server_join_msg) or node2.contains_in_log(
server_join_msg
)
assert node2.contains_in_log(part_of_cluster)
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1.sync("/test_three_0")
for i in range(200):
assert zk1.exists(f"/test_three_{i}") is not None
for i in range(100):
zk2.create(f"/test_four_{i}", b"somedata")
node3.start_clickhouse()
config, _ = zk2.reconfig(
joining="server.3=node3:9234", leaving=None, new_members=None
)
ku.wait_until_connected(cluster, node3)
config = config.decode("utf-8")
print("After adding 3", config)
assert len(config.split("\n")) == 3
assert "node1" in config
assert "node2" in config
assert "node3" in config
zk3 = get_fake_zk(node3)
assert ku.configs_equal(config, ku.get_config_str(zk3))
for i in range(100):
assert zk3.exists(f"/test_four_{i}") is not None
zk3.create(f"/test_four_{100 + i}", b"somedata")
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1.sync("/test_four_0")
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2.sync("/test_four_0")
for i in range(200):
assert zk1.exists(f"/test_four_{i}") is not None
assert zk2.exists(f"/test_four_{i}") is not None
assert node3.contains_in_log(part_of_cluster)

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
<priority>0</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
<priority>0</priority>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
<priority>0</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
<priority>0</priority>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
<priority>0</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
<priority>0</priority>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,145 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as ku
import os
from kazoo.client import KazooClient
from kazoo.exceptions import BadVersionException, BadArgumentsException
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
node1 = cluster.add_instance("node1", main_configs=["configs/keeper1.xml"])
node2 = cluster.add_instance("node2", main_configs=["configs/keeper2.xml"])
node3 = cluster.add_instance("node3", main_configs=["configs/keeper3.xml"])
log_msg_removed = "has been removed from the cluster"
zk1, zk2, zk3 = None, None, None
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
for conn in [zk1, zk2, zk3]:
if conn:
conn.stop()
conn.close()
cluster.shutdown()
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)
def test_reconfig_remove_followers_from_3(started_cluster):
"""
Remove 1 follower node from cluster of 3.
Then remove another follower from two left nodes.
Check that remaining node is in standalone mode.
"""
zk1 = get_fake_zk(node1)
config, _ = zk1.get("/keeper/config")
config = config.decode("utf-8")
print("Initial config", config)
assert len(config.split("\n")) == 3
assert "node1" in config
assert "node2" in config
assert "node3" in config
with pytest.raises(BadVersionException):
zk1.reconfig(joining=None, leaving="1", new_members=None, from_config=20)
with pytest.raises(BadArgumentsException):
zk1.reconfig(joining=None, leaving=None, new_members=None)
with pytest.raises(BadArgumentsException):
# bulk reconfiguration is not supported
zk1.reconfig(joining=None, leaving=None, new_members="3")
with pytest.raises(BadArgumentsException):
zk1.reconfig(joining="1", leaving="1", new_members="3")
with pytest.raises(BadArgumentsException):
# at least one node must be left
zk1.reconfig(joining=None, leaving="1,2,3", new_members=None)
for i in range(100):
zk1.create(f"/test_two_{i}", b"somedata")
zk2 = get_fake_zk(node2)
zk2.sync("/test_two_0")
assert ku.configs_equal(config, ku.get_config_str(zk2))
zk3 = get_fake_zk(node3)
zk3.sync("/test_two_0")
assert ku.configs_equal(config, ku.get_config_str(zk3))
for i in range(100):
assert zk2.exists(f"test_two_{i}") is not None
assert zk3.exists(f"test_two_{i}") is not None
config, _ = zk1.reconfig(joining=None, leaving="3", new_members=None)
config = config.decode("utf-8")
print("After removing 3", config)
assert len(config.split("\n")) == 2
assert "node1" in config
assert "node2" in config
assert "node3" not in config
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
assert ku.configs_equal(config, ku.get_config_str(zk2))
for i in range(100):
assert zk2.exists(f"test_two_{i}") is not None
zk2.create(f"/test_two_{100 + i}", b"otherdata")
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1.sync("/test_two_0")
for i in range(200):
assert zk1.exists(f"test_two_{i}") is not None
with pytest.raises(Exception):
zk3.stop()
zk3.close()
zk3 = get_fake_zk(node3)
zk3.sync("/test_two_0")
assert node3.contains_in_log(log_msg_removed)
for i in range(100):
zk2.create(f"/test_two_{200 + i}", b"otherdata")
config, _ = zk1.reconfig(joining=None, leaving="2", new_members=None)
config = config.decode("utf-8")
print("After removing 2", config)
assert len(config.split("\n")) == 1
assert "node1" in config
assert "node2" not in config
assert "node3" not in config
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1.sync("/test_two_0")
for i in range(300):
assert zk1.exists(f"test_two_{i}") is not None
with pytest.raises(Exception):
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2.sync("/test_two_0")
assert not node1.contains_in_log(log_msg_removed)
assert node2.contains_in_log(log_msg_removed)
assert "Mode: standalone" in zk1.command(b"stat")

View File

@ -0,0 +1,47 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>4</id>
<hostname>node4</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>5</id>
<hostname>node5</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,47 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>4</id>
<hostname>node4</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>5</id>
<hostname>node5</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,47 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>4</id>
<hostname>node4</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>5</id>
<hostname>node5</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,47 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>4</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>4</id>
<hostname>node4</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>5</id>
<hostname>node5</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,47 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>5</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>4</id>
<hostname>node4</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>5</id>
<hostname>node5</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,149 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as ku
import os
from kazoo.client import KazooClient, KazooState
from kazoo.exceptions import BadVersionException, BadArgumentsException
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
nodes = [
cluster.add_instance(f"node{i}", main_configs=[f"configs/keeper{i}.xml"])
for i in range(1, 6)
]
node1, node2, node3, node4, node5 = nodes
log_msg_removed = "has been removed from the cluster"
zk1, zk2, zk3, zk4, zk5 = None, None, None, None, None
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
for conn in [zk1, zk2, zk3, zk4, zk5]:
if conn:
conn.stop()
conn.close()
cluster.shutdown()
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)
def test_reconfig_remove_2_and_leader(started_cluster):
"""
Remove 2 followers from a cluster of 5. Remove leader from 3 nodes.
"""
zk1 = get_fake_zk(node1)
config = ku.get_config_str(zk1)
print("Initial config", config)
assert len(config.split("\n")) == 5
for i in range(100):
zk1.create(f"/test_two_{i}", b"somedata")
zk4 = get_fake_zk(node4)
zk4.sync("/test_two_0")
assert ku.configs_equal(config, ku.get_config_str(zk4))
zk5 = get_fake_zk(node5)
zk5.sync("/test_two_0")
assert ku.configs_equal(config, ku.get_config_str(zk5))
for i in range(100):
assert zk4.exists(f"test_two_{i}") is not None
assert zk5.exists(f"test_two_{i}") is not None
zk4.create(f"/test_two_{100 + i}", b"otherdata")
zk2 = get_fake_zk(node2)
config, _ = zk2.reconfig(joining=None, leaving="4,5", new_members=None)
config = config.decode("utf-8")
print("After removing 4,5", config)
assert len(config.split("\n")) == 3
assert "node1" in config
assert "node2" in config
assert "node3" in config
assert "node4" not in config
assert "node5" not in config
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1.sync("/test_two_0")
assert ku.configs_equal(config, ku.get_config_str(zk1))
for i in range(200):
assert zk1.exists(f"test_two_{i}") is not None
assert zk2.exists(f"test_two_{i}") is not None
with pytest.raises(Exception):
zk4.stop()
zk4.close()
zk4 = get_fake_zk(node4)
zk4.sync("/test_two_0")
with pytest.raises(Exception):
zk5.stop()
zk5.close()
zk5 = get_fake_zk(node5)
zk5.sync("/test_two_0")
assert not node1.contains_in_log(log_msg_removed)
assert not node2.contains_in_log(log_msg_removed)
assert not node3.contains_in_log(log_msg_removed)
assert node4.contains_in_log(log_msg_removed)
assert node5.contains_in_log(log_msg_removed)
assert ku.is_leader(cluster, node1)
for i in range(100):
zk1.create(f"/test_leader_{i}", b"somedata")
# when a leader gets a remove request, it must yield leadership
config, _ = zk1.reconfig(joining=None, leaving="1", new_members=None)
config = config.decode("utf-8")
print("After removing 1 (leader)", config)
assert len(config.split("\n")) == 2
assert "node1" not in config
assert "node2" in config
assert "node3" in config
assert "node4" not in config
assert "node5" not in config
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2.sync("/test_leader_0")
assert ku.configs_equal(config, ku.get_config_str(zk2))
zk3 = get_fake_zk(node3)
zk3.sync("/test_leader_0")
assert ku.configs_equal(config, ku.get_config_str(zk3))
for i in range(100):
assert zk2.exists(f"test_leader_{i}") is not None
assert zk3.exists(f"test_leader_{i}") is not None
with pytest.raises(Exception):
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1.sync("/test_leader_0")
assert node1.contains_in_log(log_msg_removed)
assert not node2.contains_in_log(log_msg_removed)
assert not node3.contains_in_log(log_msg_removed)

View File

@ -0,0 +1,35 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,35 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,35 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,21 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>4</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server> <id>2</id> <hostname>node2</hostname> <port>9234</port> </server>
<server> <id>3</id> <hostname>node3</hostname> <port>9234</port> </server>
<server> <id>4</id> <hostname>node4</hostname> <port>9234</port> </server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,127 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from os.path import join, dirname, realpath
import time
import helpers.keeper_utils as ku
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = join(dirname(realpath(__file__)), "configs")
node1 = cluster.add_instance("node1", main_configs=["configs/keeper1.xml"])
node2 = cluster.add_instance("node2", main_configs=["configs/keeper2.xml"])
node3 = cluster.add_instance("node3", main_configs=["configs/keeper3.xml"])
node4 = cluster.add_instance("node4", stay_alive=True)
zk1, zk2, zk3, zk4 = None, None, None, None
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node4.stop_clickhouse()
node4.copy_file_to_container(
join(CONFIG_DIR, "keeper4.xml"),
"/etc/clickhouse-server/config.d/keeper.xml",
)
yield cluster
finally:
for conn in [zk1, zk2, zk3, zk4]:
if conn:
conn.stop()
conn.close()
cluster.shutdown()
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)
def test_reconfig_replace_leader(started_cluster):
"""
Remove leader from a cluster of 3 and add a new node via two commands.
"""
zk1 = get_fake_zk(node1)
config = ku.get_config_str(zk1)
assert len(config.split("\n")) == 3
assert "node1" in config
assert "node2" in config
assert "node3" in config
assert "node4" not in config
for i in range(100):
zk1.create(f"/test_four_{i}", b"somedata")
zk2 = get_fake_zk(node2)
zk2.sync("/test_four_0")
assert ku.configs_equal(config, ku.get_config_str(zk2))
zk3 = get_fake_zk(node3)
zk3.sync("/test_four_0")
assert ku.configs_equal(config, ku.get_config_str(zk3))
for i in range(100):
assert zk2.exists(f"/test_four_{i}") is not None
assert zk3.exists(f"/test_four_{i}") is not None
assert ku.is_leader(cluster, node1)
config, _ = zk2.reconfig(joining=None, leaving="1", new_members=None)
config = config.decode("utf-8")
print("After removing 1 (leader)", config)
assert len(config.split("\n")) == 2
assert "node1" not in config
assert "node2" in config
assert "node3" in config
assert "node4" not in config
with pytest.raises(Exception):
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1.sync("/test_four_0")
node4.start_clickhouse()
config, _ = zk2.reconfig(
joining="server.4=node4:9234", leaving=None, new_members=None
)
config = config.decode("utf-8")
ku.wait_until_connected(cluster, node4)
print("After adding 4", config)
assert len(config.split("\n")) == 3
assert "node1" not in config
assert "node2" in config
assert "node3" in config
assert "node4" in config
zk4 = get_fake_zk(node4)
assert ku.configs_equal(config, ku.get_config_str(zk4))
for i in range(100):
assert zk4.exists(f"test_four_{i}") is not None
zk4.create(f"/test_four_{100 + i}", b"somedata")
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2.sync("/test_four_0")
assert ku.configs_equal(config, ku.get_config_str(zk2))
zk3.stop()
zk3.close()
zk3 = get_fake_zk(node3)
zk3.sync("/test_four_0")
assert ku.configs_equal(config, ku.get_config_str(zk3))
for i in range(200):
assert zk2.exists(f"test_four_{i}") is not None
assert zk3.exists(f"test_four_{i}") is not None

View File

@ -0,0 +1,35 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,35 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,35 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,21 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>4</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server> <id>2</id> <hostname>node2</hostname> <port>9234</port> </server>
<server> <id>3</id> <hostname>node3</hostname> <port>9234</port> </server>
<server> <id>4</id> <hostname>node4</hostname> <port>9234</port> </server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,120 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from os.path import join, dirname, realpath
import time
import helpers.keeper_utils as ku
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = join(dirname(realpath(__file__)), "configs")
node1 = cluster.add_instance("node1", main_configs=["configs/keeper1.xml"])
node2 = cluster.add_instance("node2", main_configs=["configs/keeper2.xml"])
node3 = cluster.add_instance("node3", main_configs=["configs/keeper3.xml"])
node4 = cluster.add_instance("node4", stay_alive=True)
zk1, zk2, zk3, zk4 = None, None, None, None
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node4.stop_clickhouse()
node4.copy_file_to_container(
join(CONFIG_DIR, "keeper4.xml"),
"/etc/clickhouse-server/config.d/keeper.xml",
)
yield cluster
finally:
for conn in [zk1, zk2, zk3, zk4]:
if conn:
conn.stop()
conn.close()
cluster.shutdown()
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)
def test_reconfig_replace_leader_in_one_command(started_cluster):
"""
Remove leader from a cluster of 3 and add a new node to this cluster in a single command
"""
zk1 = get_fake_zk(node1)
config = ku.get_config_str(zk1)
assert len(config.split("\n")) == 3
assert "node1" in config
assert "node2" in config
assert "node3" in config
assert "node4" not in config
for i in range(100):
zk1.create(f"/test_four_{i}", b"somedata")
zk2 = get_fake_zk(node2)
zk2.sync("/test_four_0")
assert ku.configs_equal(config, ku.get_config_str(zk2))
zk3 = get_fake_zk(node3)
zk3.sync("/test_four_0")
assert ku.configs_equal(config, ku.get_config_str(zk3))
for i in range(100):
assert zk2.exists(f"/test_four_{i}") is not None
assert zk3.exists(f"/test_four_{i}") is not None
assert ku.is_leader(cluster, node1)
node4.start_clickhouse()
config, _ = zk2.reconfig(
joining="server.4=node4:9234", leaving="1", new_members=None
)
config = config.decode("utf-8")
print("After removing 1 and adding 4", config)
assert len(config.split("\n")) == 3
assert "node1" not in config
assert "node2" in config
assert "node3" in config
assert "node4" in config
ku.wait_until_connected(cluster, node4)
time.sleep(1)
zk4 = get_fake_zk(node4)
zk4.sync("/test_four_0")
assert ku.configs_equal(config, ku.get_config_str(zk4))
for i in range(100):
assert zk4.exists(f"test_four_{i}") is not None
zk4.create(f"/test_four_{100 + i}", b"somedata")
with pytest.raises(Exception):
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1.sync("/test_four_0")
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2.sync("/test_four_0")
assert ku.configs_equal(config, ku.get_config_str(zk2))
zk3.stop()
zk3.close()
zk3 = get_fake_zk(node3)
zk3.sync("/test_four_0")
assert ku.configs_equal(config, ku.get_config_str(zk3))
for i in range(200):
assert zk2.exists(f"test_four_{i}") is not None
assert zk3.exists(f"test_four_{i}") is not None

View File

@ -15,7 +15,7 @@ using namespace DB;
void dumpMachine(std::shared_ptr<KeeperStateMachine> machine)
{
auto & storage = machine->getStorage();
auto & storage = machine->getStorageForUnitTests();
std::queue<std::string> keys;
keys.push("/");