Merge pull request #54201 from ClickHouse/pufit/keeper-client-reconfig

Implementing `reconfig`, `sync`, `exists` commands for keeper-client
This commit is contained in:
pufit 2023-09-12 10:58:02 -04:00 committed by GitHub
commit a54a7b726d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 649 additions and 495 deletions

View File

@ -16,6 +16,8 @@ A client application to interact with clickhouse-keeper by its native protocol.
- `--session-timeout=TIMEOUT` — Set session timeout in seconds. Default value: 10s. - `--session-timeout=TIMEOUT` — Set session timeout in seconds. Default value: 10s.
- `--operation-timeout=TIMEOUT` — Set operation timeout in seconds. Default value: 10s. - `--operation-timeout=TIMEOUT` — Set operation timeout in seconds. Default value: 10s.
- `--history-file=FILE_PATH` — Set path of history file. Default value: `~/.keeper-client-history`. - `--history-file=FILE_PATH` — Set path of history file. Default value: `~/.keeper-client-history`.
- `--log-level=LEVEL` — Set log level. Default value: `information`.
- `--no-confirmation` — If set, will not require a confirmation on several commands. Default value `false` for interactive and `true` for query
- `--help` — Shows the help message. - `--help` — Shows the help message.
## Example {#clickhouse-keeper-client-example} ## Example {#clickhouse-keeper-client-example}
@ -44,6 +46,7 @@ keeper foo bar
- `ls [path]` -- Lists the nodes for the given path (default: cwd) - `ls [path]` -- Lists the nodes for the given path (default: cwd)
- `cd [path]` -- Change the working path (default `.`) - `cd [path]` -- Change the working path (default `.`)
- `exists <path>` -- Returns `1` if node exists, `0` otherwise
- `set <path> <value> [version]` -- Updates the node's value. Only update if version matches (default: -1) - `set <path> <value> [version]` -- Updates the node's value. Only update if version matches (default: -1)
- `create <path> <value> [mode]` -- Creates new node with the set value - `create <path> <value> [mode]` -- Creates new node with the set value
- `touch <path>` -- Creates new node with an empty string as value. Doesn't throw an exception if the node already exists - `touch <path>` -- Creates new node with an empty string as value. Doesn't throw an exception if the node already exists
@ -56,3 +59,5 @@ keeper foo bar
- `find_super_nodes <threshold> [path]` -- Finds nodes with number of children larger than some threshold for the given path (default `.`) - `find_super_nodes <threshold> [path]` -- Finds nodes with number of children larger than some threshold for the given path (default `.`)
- `delete_stale_backups` -- Deletes ClickHouse nodes used for backups that are now inactive - `delete_stale_backups` -- Deletes ClickHouse nodes used for backups that are now inactive
- `find_big_family [path] [n]` -- Returns the top n nodes with the biggest family in the subtree (default path = `.` and n = 10) - `find_big_family [path] [n]` -- Returns the top n nodes with the biggest family in the subtree (default path = `.` and n = 10)
- `sync <path>` -- Synchronizes node between processes and leader
- `reconfig <add|remove|set> "<arg>" [version]` -- Reconfigure Keeper cluster. See https://clickhouse.com/docs/en/guides/sre/keeper/clickhouse-keeper#reconfiguration

View File

@ -9,11 +9,11 @@ namespace DB
bool LSCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const bool LSCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{ {
String arg; String path;
if (!parseKeeperPath(pos, expected, arg)) if (!parseKeeperPath(pos, expected, path))
return true; return true;
node->args.push_back(std::move(arg)); node->args.push_back(std::move(path));
return true; return true;
} }
@ -42,11 +42,11 @@ void LSCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con
bool CDCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const bool CDCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{ {
String arg; String path;
if (!parseKeeperPath(pos, expected, arg)) if (!parseKeeperPath(pos, expected, path))
return true; return true;
node->args.push_back(std::move(arg)); node->args.push_back(std::move(path));
return true; return true;
} }
@ -64,11 +64,12 @@ void CDCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con
bool SetCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const bool SetCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{ {
String arg; String path;
if (!parseKeeperPath(pos, expected, arg)) if (!parseKeeperPath(pos, expected, path))
return false; return false;
node->args.push_back(std::move(arg)); node->args.push_back(std::move(path));
String arg;
if (!parseKeeperArg(pos, expected, arg)) if (!parseKeeperArg(pos, expected, arg))
return false; return false;
node->args.push_back(std::move(arg)); node->args.push_back(std::move(arg));
@ -93,11 +94,12 @@ void SetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co
bool CreateCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const bool CreateCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{ {
String arg; String path;
if (!parseKeeperPath(pos, expected, arg)) if (!parseKeeperPath(pos, expected, path))
return false; return false;
node->args.push_back(std::move(arg)); node->args.push_back(std::move(path));
String arg;
if (!parseKeeperArg(pos, expected, arg)) if (!parseKeeperArg(pos, expected, arg))
return false; return false;
node->args.push_back(std::move(arg)); node->args.push_back(std::move(arg));
@ -143,10 +145,10 @@ void TouchCommand::execute(const ASTKeeperQuery * query, KeeperClient * client)
bool GetCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const bool GetCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{ {
String arg; String path;
if (!parseKeeperPath(pos, expected, arg)) if (!parseKeeperPath(pos, expected, path))
return false; return false;
node->args.push_back(std::move(arg)); node->args.push_back(std::move(path));
return true; return true;
} }
@ -156,13 +158,28 @@ void GetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co
std::cout << client->zookeeper->get(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n"; std::cout << client->zookeeper->get(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
} }
bool ExistsCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, DB::Expected & expected) const
{
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(path));
return true;
}
void ExistsCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient * client) const
{
std::cout << client->zookeeper->exists(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
}
bool GetStatCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const bool GetStatCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{ {
String arg; String path;
if (!parseKeeperPath(pos, expected, arg)) if (!parseKeeperPath(pos, expected, path))
return true; return true;
node->args.push_back(std::move(arg)); node->args.push_back(std::move(path));
return true; return true;
} }
@ -325,10 +342,10 @@ void FindBigFamily::execute(const ASTKeeperQuery * query, KeeperClient * client)
bool RMCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const bool RMCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{ {
String arg; String path;
if (!parseKeeperPath(pos, expected, arg)) if (!parseKeeperPath(pos, expected, path))
return false; return false;
node->args.push_back(std::move(arg)); node->args.push_back(std::move(path));
return true; return true;
} }
@ -340,10 +357,10 @@ void RMCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con
bool RMRCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const bool RMRCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{ {
String arg; String path;
if (!parseKeeperPath(pos, expected, arg)) if (!parseKeeperPath(pos, expected, path))
return false; return false;
node->args.push_back(std::move(arg)); node->args.push_back(std::move(path));
return true; return true;
} }
@ -355,6 +372,70 @@ void RMRCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co
[client, path]{ client->zookeeper->removeRecursive(path); }); [client, path]{ client->zookeeper->removeRecursive(path); });
} }
bool ReconfigCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, DB::Expected & expected) const
{
ReconfigCommand::Operation operation;
if (ParserKeyword{"ADD"}.ignore(pos, expected))
operation = ReconfigCommand::Operation::ADD;
else if (ParserKeyword{"REMOVE"}.ignore(pos, expected))
operation = ReconfigCommand::Operation::REMOVE;
else if (ParserKeyword{"SET"}.ignore(pos, expected))
operation = ReconfigCommand::Operation::SET;
else
return false;
node->args.push_back(operation);
ParserToken{TokenType::Whitespace}.ignore(pos);
String arg;
if (!parseKeeperArg(pos, expected, arg))
return false;
node->args.push_back(std::move(arg));
return true;
}
void ReconfigCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient * client) const
{
String joining;
String leaving;
String new_members;
auto operation = query->args[0].get<ReconfigCommand::Operation>();
switch (operation)
{
case static_cast<UInt8>(ReconfigCommand::Operation::ADD):
joining = query->args[1].safeGet<DB::String>();
break;
case static_cast<UInt8>(ReconfigCommand::Operation::REMOVE):
leaving = query->args[1].safeGet<DB::String>();
break;
case static_cast<UInt8>(ReconfigCommand::Operation::SET):
new_members = query->args[1].safeGet<DB::String>();
break;
default:
UNREACHABLE();
}
auto response = client->zookeeper->reconfig(joining, leaving, new_members);
std::cout << response.value << '\n';
}
bool SyncCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, DB::Expected & expected) const
{
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(path));
return true;
}
void SyncCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient * client) const
{
std::cout << client->zookeeper->sync(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
}
bool HelpCommand::parse(IParser::Pos & /* pos */, std::shared_ptr<ASTKeeperQuery> & /* node */, Expected & /* expected */) const bool HelpCommand::parse(IParser::Pos & /* pos */, std::shared_ptr<ASTKeeperQuery> & /* node */, Expected & /* expected */) const
{ {
return true; return true;

View File

@ -101,6 +101,17 @@ class GetCommand : public IKeeperClientCommand
String getHelpMessage() const override { return "{} <path> -- Returns the node's value"; } String getHelpMessage() const override { return "{} <path> -- Returns the node's value"; }
}; };
class ExistsCommand : public IKeeperClientCommand
{
String getName() const override { return "exists"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> -- Returns `1` if node exists, `0` otherwise"; }
};
class GetStatCommand : public IKeeperClientCommand class GetStatCommand : public IKeeperClientCommand
{ {
String getName() const override { return "get_stat"; } String getName() const override { return "get_stat"; }
@ -177,6 +188,35 @@ class RMRCommand : public IKeeperClientCommand
String getHelpMessage() const override { return "{} <path> -- Recursively deletes path. Confirmation required"; } String getHelpMessage() const override { return "{} <path> -- Recursively deletes path. Confirmation required"; }
}; };
class ReconfigCommand : public IKeeperClientCommand
{
enum class Operation : UInt8
{
ADD = 0,
REMOVE = 1,
SET = 2,
};
String getName() const override { return "reconfig"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <add|remove|set> \"<arg>\" [version] -- Reconfigure Keeper cluster. See https://clickhouse.com/docs/en/guides/sre/keeper/clickhouse-keeper#reconfiguration"; }
};
class SyncCommand: public IKeeperClientCommand
{
String getName() const override { return "sync"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> -- Synchronizes node between processes and leader"; }
};
class HelpCommand : public IKeeperClientCommand class HelpCommand : public IKeeperClientCommand
{ {
String getName() const override { return "help"; } String getName() const override { return "help"; }

View File

@ -84,8 +84,11 @@ std::vector<String> KeeperClient::getCompletions(const String & prefix) const
void KeeperClient::askConfirmation(const String & prompt, std::function<void()> && callback) void KeeperClient::askConfirmation(const String & prompt, std::function<void()> && callback)
{ {
if (!ask_confirmation)
return callback();
std::cout << prompt << " Continue?\n"; std::cout << prompt << " Continue?\n";
need_confirmation = true; waiting_confirmation = true;
confirmation_callback = callback; confirmation_callback = callback;
} }
@ -170,6 +173,14 @@ void KeeperClient::defineOptions(Poco::Util::OptionSet & options)
Poco::Util::Option("log-level", "", "set log level") Poco::Util::Option("log-level", "", "set log level")
.argument("<level>") .argument("<level>")
.binding("log-level")); .binding("log-level"));
options.addOption(
Poco::Util::Option("no-confirmation", "", "if set, will not require a confirmation on several commands. default false for interactive and true for query")
.binding("no-confirmation"));
options.addOption(
Poco::Util::Option("tests-mode", "", "run keeper-client in a special mode for tests. all commands output are separated by special symbols. default false")
.binding("tests-mode"));
} }
void KeeperClient::initialize(Poco::Util::Application & /* self */) void KeeperClient::initialize(Poco::Util::Application & /* self */)
@ -184,12 +195,15 @@ void KeeperClient::initialize(Poco::Util::Application & /* self */)
std::make_shared<CreateCommand>(), std::make_shared<CreateCommand>(),
std::make_shared<TouchCommand>(), std::make_shared<TouchCommand>(),
std::make_shared<GetCommand>(), std::make_shared<GetCommand>(),
std::make_shared<ExistsCommand>(),
std::make_shared<GetStatCommand>(), std::make_shared<GetStatCommand>(),
std::make_shared<FindSuperNodes>(), std::make_shared<FindSuperNodes>(),
std::make_shared<DeleteStaleBackups>(), std::make_shared<DeleteStaleBackups>(),
std::make_shared<FindBigFamily>(), std::make_shared<FindBigFamily>(),
std::make_shared<RMCommand>(), std::make_shared<RMCommand>(),
std::make_shared<RMRCommand>(), std::make_shared<RMRCommand>(),
std::make_shared<ReconfigCommand>(),
std::make_shared<SyncCommand>(),
std::make_shared<HelpCommand>(), std::make_shared<HelpCommand>(),
std::make_shared<FourLetterWordCommand>(), std::make_shared<FourLetterWordCommand>(),
}); });
@ -229,18 +243,6 @@ void KeeperClient::initialize(Poco::Util::Application & /* self */)
EventNotifier::init(); EventNotifier::init();
} }
void KeeperClient::executeQuery(const String & query)
{
std::vector<String> queries;
boost::algorithm::split(queries, query, boost::is_any_of(";"));
for (const auto & query_text : queries)
{
if (!query_text.empty())
processQueryText(query_text);
}
}
bool KeeperClient::processQueryText(const String & text) bool KeeperClient::processQueryText(const String & text)
{ {
if (exit_strings.find(text) != exit_strings.end()) if (exit_strings.find(text) != exit_strings.end())
@ -248,29 +250,44 @@ bool KeeperClient::processQueryText(const String & text)
try try
{ {
if (need_confirmation) if (waiting_confirmation)
{ {
need_confirmation = false; waiting_confirmation = false;
if (text.size() == 1 && (text == "y" || text == "Y")) if (text.size() == 1 && (text == "y" || text == "Y"))
confirmation_callback(); confirmation_callback();
return true; return true;
} }
KeeperParser parser; KeeperParser parser;
String message;
const char * begin = text.data(); const char * begin = text.data();
ASTPtr res = tryParseQuery(parser, begin, begin + text.size(), message, true, "", false, 0, 0, false); const char * end = begin + text.size();
if (!res) while (begin < end)
{ {
std::cerr << message << "\n"; String message;
return true; ASTPtr res = tryParseQuery(
parser,
begin,
end,
/* out_error_message = */ message,
/* hilite = */ true,
/* description = */ "",
/* allow_multi_statements = */ true,
/* max_query_size = */ 0,
/* max_parser_depth = */ 0,
/* skip_insignificant = */ false);
if (!res)
{
std::cerr << message << "\n";
return true;
}
auto * query = res->as<ASTKeeperQuery>();
auto command = KeeperClient::commands.find(query->command);
command->second->execute(query, this);
} }
auto * query = res->as<ASTKeeperQuery>();
auto command = KeeperClient::commands.find(query->command);
command->second->execute(query, this);
} }
catch (Coordination::Exception & err) catch (Coordination::Exception & err)
{ {
@ -279,7 +296,7 @@ bool KeeperClient::processQueryText(const String & text)
return true; return true;
} }
void KeeperClient::runInteractive() void KeeperClient::runInteractiveReplxx()
{ {
LineReader::Patterns query_extenders = {"\\"}; LineReader::Patterns query_extenders = {"\\"};
@ -299,7 +316,7 @@ void KeeperClient::runInteractive()
while (true) while (true)
{ {
String prompt; String prompt;
if (need_confirmation) if (waiting_confirmation)
prompt = "[y/n] "; prompt = "[y/n] ";
else else
prompt = cwd.string() + " :) "; prompt = cwd.string() + " :) ";
@ -313,6 +330,26 @@ void KeeperClient::runInteractive()
} }
} }
void KeeperClient::runInteractiveInputStream()
{
for (String input; std::getline(std::cin, input);)
{
if (!processQueryText(input))
break;
std::cout << "\a\a\a\a" << std::endl;
std::cerr << std::flush;
}
}
void KeeperClient::runInteractive()
{
if (config().hasOption("tests-mode"))
runInteractiveInputStream();
else
runInteractiveReplxx();
}
int KeeperClient::main(const std::vector<String> & /* args */) int KeeperClient::main(const std::vector<String> & /* args */)
{ {
if (config().hasOption("help")) if (config().hasOption("help"))
@ -362,8 +399,13 @@ int KeeperClient::main(const std::vector<String> & /* args */)
zk_args.operation_timeout_ms = config().getInt("operation-timeout", 10) * 1000; zk_args.operation_timeout_ms = config().getInt("operation-timeout", 10) * 1000;
zookeeper = std::make_unique<zkutil::ZooKeeper>(zk_args); zookeeper = std::make_unique<zkutil::ZooKeeper>(zk_args);
if (config().has("no-confirmation") || config().has("query"))
ask_confirmation = false;
if (config().has("query")) if (config().has("query"))
executeQuery(config().getString("query")); {
processQueryText(config().getString("query"));
}
else else
runInteractive(); runInteractive();

View File

@ -49,8 +49,10 @@ public:
protected: protected:
void runInteractive(); void runInteractive();
void runInteractiveReplxx();
void runInteractiveInputStream();
bool processQueryText(const String & text); bool processQueryText(const String & text);
void executeQuery(const String & query);
void loadCommands(std::vector<Command> && new_commands); void loadCommands(std::vector<Command> && new_commands);
@ -61,7 +63,8 @@ protected:
zkutil::ZooKeeperArgs zk_args; zkutil::ZooKeeperArgs zk_args;
bool need_confirmation = false; bool ask_confirmation = true;
bool waiting_confirmation = false;
std::vector<String> registered_commands_and_four_letter_words; std::vector<String> registered_commands_and_four_letter_words;
}; };

View File

@ -7,43 +7,34 @@ namespace DB
bool parseKeeperArg(IParser::Pos & pos, Expected & expected, String & result) bool parseKeeperArg(IParser::Pos & pos, Expected & expected, String & result)
{ {
expected.add(pos, getTokenName(TokenType::BareWord)); if (pos->type == TokenType::QuotedIdentifier || pos->type == TokenType::StringLiteral)
if (pos->type == TokenType::BareWord)
{ {
result = String(pos->begin, pos->end); if (!parseIdentifierOrStringLiteral(pos, expected, result))
++pos; return false;
ParserToken{TokenType::Whitespace}.ignore(pos); ParserToken{TokenType::Whitespace}.ignore(pos);
return true;
} }
bool status = parseIdentifierOrStringLiteral(pos, expected, result); while (pos->type != TokenType::Whitespace && pos->type != TokenType::EndOfStream && pos->type != TokenType::Semicolon)
ParserToken{TokenType::Whitespace}.ignore(pos);
return status;
}
bool parseKeeperPath(IParser::Pos & pos, Expected & expected, String & path)
{
expected.add(pos, "path");
if (pos->type == TokenType::QuotedIdentifier || pos->type == TokenType::StringLiteral)
return parseIdentifierOrStringLiteral(pos, expected, path);
String result;
while (pos->type != TokenType::Whitespace && pos->type != TokenType::EndOfStream)
{ {
result.append(pos->begin, pos->end); result.append(pos->begin, pos->end);
++pos; ++pos;
} }
ParserToken{TokenType::Whitespace}.ignore(pos); ParserToken{TokenType::Whitespace}.ignore(pos);
if (result.empty()) if (result.empty())
return false; return false;
path = result;
return true; return true;
} }
bool parseKeeperPath(IParser::Pos & pos, Expected & expected, String & path)
{
expected.add(pos, "path");
return parseKeeperArg(pos, expected, path);
}
bool KeeperParser::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) bool KeeperParser::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{ {
auto query = std::make_shared<ASTKeeperQuery>(); auto query = std::make_shared<ASTKeeperQuery>();

View File

@ -877,6 +877,24 @@ void ZooKeeper::handleEphemeralNodeExistence(const std::string & path, const std
} }
} }
Coordination::ReconfigResponse ZooKeeper::reconfig(
const std::string & joining,
const std::string & leaving,
const std::string & new_members,
int32_t version)
{
auto future_result = asyncReconfig(joining, leaving, new_members, version);
if (future_result.wait_for(std::chrono::milliseconds(args.operation_timeout_ms)) != std::future_status::ready)
{
impl->finalize(fmt::format("Operation timeout on {}", Coordination::OpNum::Reconfig));
throw KeeperException(Coordination::Error::ZOPERATIONTIMEOUT);
}
return future_result.get();
}
ZooKeeperPtr ZooKeeper::startNewSession() const ZooKeeperPtr ZooKeeper::startNewSession() const
{ {
return std::make_shared<ZooKeeper>(args, zk_log); return std::make_shared<ZooKeeper>(args, zk_log);
@ -1226,6 +1244,27 @@ std::future<Coordination::SyncResponse> ZooKeeper::asyncSync(const std::string &
return future; return future;
} }
std::future<Coordination::ReconfigResponse> ZooKeeper::asyncReconfig(
const std::string & joining,
const std::string & leaving,
const std::string & new_members,
int32_t version)
{
auto promise = std::make_shared<std::promise<Coordination::ReconfigResponse>>();
auto future = promise->get_future();
auto callback = [promise](const Coordination::ReconfigResponse & response) mutable
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(KeeperException(response.error)));
else
promise->set_value(response);
};
impl->reconfig(joining, leaving, new_members, version, std::move(callback));
return future;
}
void ZooKeeper::finalize(const String & reason) void ZooKeeper::finalize(const String & reason)
{ {
impl->finalize(reason); impl->finalize(reason);

View File

@ -449,6 +449,12 @@ public:
/// disappear automatically after 3x session_timeout. /// disappear automatically after 3x session_timeout.
void handleEphemeralNodeExistence(const std::string & path, const std::string & fast_delete_if_equal_value); void handleEphemeralNodeExistence(const std::string & path, const std::string & fast_delete_if_equal_value);
Coordination::ReconfigResponse reconfig(
const std::string & joining,
const std::string & leaving,
const std::string & new_members,
int32_t version = -1);
/// Async interface (a small subset of operations is implemented). /// Async interface (a small subset of operations is implemented).
/// ///
/// Usage: /// Usage:
@ -529,6 +535,13 @@ public:
const std::string & path, const std::string & path,
Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL); Coordination::ListRequestType list_request_type = Coordination::ListRequestType::ALL);
using FutureReconfig = std::future<Coordination::ReconfigResponse>;
FutureReconfig asyncReconfig(
const std::string & joining,
const std::string & leaving,
const std::string & new_members,
int32_t version = -1);
void finalize(const String & reason); void finalize(const String & reason);
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_); void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);

View File

@ -1,6 +1,185 @@
import io
import subprocess
import socket import socket
import time import time
import typing as tp
import contextlib
import select
from kazoo.client import KazooClient from kazoo.client import KazooClient
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.client import CommandRequest
def execute_keeper_client_query(
cluster: ClickHouseCluster, node: ClickHouseInstance, query: str
) -> str:
request = CommandRequest(
[
cluster.server_bin_path,
"keeper-client",
"--host",
str(cluster.get_instance_ip(node.name)),
"--port",
str(cluster.zookeeper_port),
"-q",
query,
],
stdin="",
)
return request.get_answer()
class KeeperException(Exception):
pass
class KeeperClient(object):
SEPARATOR = b"\a\a\a\a\n"
def __init__(self, bin_path: str, host: str, port: int):
self.bin_path = bin_path
self.host = host
self.port = port
self.proc = subprocess.Popen(
[
bin_path,
"keeper-client",
"--host",
host,
"--port",
str(port),
"--log-level",
"error",
"--tests-mode",
"--no-confirmation",
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
self.poller = select.epoll()
self.poller.register(self.proc.stdout)
self.poller.register(self.proc.stderr)
self._fd_nums = {
self.proc.stdout.fileno(): self.proc.stdout,
self.proc.stderr.fileno(): self.proc.stderr,
}
self.stopped = False
def execute_query(self, query: str, timeout: float = 10.0) -> str:
output = io.BytesIO()
self.proc.stdin.write(query.encode() + b"\n")
self.proc.stdin.flush()
events = self.poller.poll(timeout)
if not events:
raise TimeoutError(f"Keeper client returned no output")
for fd_num, event in events:
if event & (select.EPOLLIN | select.EPOLLPRI):
file = self._fd_nums[fd_num]
if file == self.proc.stdout:
while True:
chunk = file.readline()
if chunk.endswith(self.SEPARATOR):
break
output.write(chunk)
elif file == self.proc.stderr:
assert self.proc.stdout.readline() == self.SEPARATOR
raise KeeperException(self.proc.stderr.readline().strip().decode())
else:
raise ValueError(f"Failed to read from pipe. Flag {event}")
data = output.getvalue().strip().decode()
return data
def cd(self, path: str, timeout: float = 10.0):
self.execute_query(f"cd {path}", timeout)
def ls(self, path: str, timeout: float = 10.0) -> list[str]:
return self.execute_query(f"ls {path}", timeout).split(" ")
def create(self, path: str, value: str, timeout: float = 10.0):
self.execute_query(f"create {path} {value}", timeout)
def get(self, path: str, timeout: float = 10.0) -> str:
return self.execute_query(f"get {path}", timeout)
def exists(self, path: str, timeout: float = 10.0) -> bool:
return bool(int(self.execute_query(f"exists {path}", timeout)))
def stop(self):
if not self.stopped:
self.stopped = True
self.proc.communicate(b"exit\n", timeout=10.0)
def sync(self, path: str, timeout: float = 10.0):
self.execute_query(f"sync {path}", timeout)
def touch(self, path: str, timeout: float = 10.0):
self.execute_query(f"touch {path}", timeout)
def find_big_family(self, path: str, n: int = 10, timeout: float = 10.0) -> str:
return self.execute_query(f"find_big_family {path} {n}", timeout)
def find_super_nodes(self, threshold: int, timeout: float = 10.0) -> str:
return self.execute_query(f"find_super_nodes {threshold}", timeout)
def delete_stale_backups(self, timeout: float = 10.0) -> str:
return self.execute_query("delete_stale_backups", timeout)
def reconfig(
self,
joining: tp.Optional[str],
leaving: tp.Optional[str],
new_members: tp.Optional[str],
timeout: float = 10.0,
) -> str:
if bool(joining) + bool(leaving) + bool(new_members) != 1:
raise ValueError(
"Exactly one of joining, leaving or new_members must be specified"
)
if joining is not None:
operation = "add"
elif leaving is not None:
operation = "remove"
elif new_members is not None:
operation = "set"
else:
raise ValueError(
"At least one of joining, leaving or new_members must be specified"
)
return self.execute_query(
f"reconfig {operation} {joining or leaving or new_members}", timeout
)
@classmethod
@contextlib.contextmanager
def from_cluster(
cls, cluster: ClickHouseCluster, keeper_node: str, port: tp.Optional[int] = None
) -> "KeeperClient":
client = cls(
cluster.server_bin_path,
cluster.get_instance_ip(keeper_node),
port or cluster.zookeeper_port,
)
try:
yield client
finally:
client.stop()
def get_keeper_socket(cluster, node, port=9181): def get_keeper_socket(cluster, node, port=9181):
@ -70,14 +249,14 @@ def get_fake_zk(cluster, node, timeout: float = 30.0) -> KazooClient:
return _fake return _fake
def get_config_str(zk: KazooClient) -> str: def get_config_str(zk: KeeperClient) -> str:
""" """
Return decoded contents of /keeper/config node Return decoded contents of /keeper/config node
""" """
return zk.get("/keeper/config")[0].decode("utf-8") return zk.get("/keeper/config")
def wait_configs_equal(left_config: str, right_zk: KazooClient, timeout: float = 30.0): def wait_configs_equal(left_config: str, right_zk: KeeperClient, timeout: float = 30.0):
""" """
Check whether get /keeper/config result in left_config is equal Check whether get /keeper/config result in left_config is equal
to get /keeper/config on right_zk ZK connection. to get /keeper/config on right_zk ZK connection.

View File

@ -1,7 +1,7 @@
import pytest import pytest
from helpers.client import CommandRequest
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV from helpers.test_tools import TSV
from helpers.keeper_utils import KeeperClient
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
@ -24,39 +24,28 @@ def started_cluster():
cluster.shutdown() cluster.shutdown()
def keeper_query(query: str): @pytest.fixture(scope="function")
return CommandRequest( def client(started_cluster):
[ with KeeperClient.from_cluster(cluster, "zoo1") as keeper_client:
cluster.server_bin_path, yield keeper_client
"keeper-client",
"--host",
str(cluster.get_instance_ip("zoo1")),
"--port",
str(cluster.zookeeper_port),
"-q",
query,
],
stdin="",
)
def test_big_family(): def test_big_family(client: KeeperClient):
command = keeper_query( client.touch("/test_big_family")
"touch test_big_family;" client.touch("/test_big_family/1")
"touch test_big_family/1;" client.touch("/test_big_family/1/1")
"touch test_big_family/1/1;" client.touch("/test_big_family/1/2")
"touch test_big_family/1/2;" client.touch("/test_big_family/1/3")
"touch test_big_family/1/3;" client.touch("/test_big_family/1/4")
"touch test_big_family/1/4;" client.touch("/test_big_family/1/5")
"touch test_big_family/1/5;" client.touch("/test_big_family/2")
"touch test_big_family/2;" client.touch("/test_big_family/2/1")
"touch test_big_family/2/1;" client.touch("/test_big_family/2/2")
"touch test_big_family/2/2;" client.touch("/test_big_family/2/3")
"touch test_big_family/2/3;"
"find_big_family test_big_family;"
)
assert command.get_answer() == TSV( response = client.find_big_family("/test_big_family")
assert response == TSV(
[ [
["/test_big_family/1", "5"], ["/test_big_family/1", "5"],
["/test_big_family/2", "3"], ["/test_big_family/2", "3"],
@ -71,34 +60,33 @@ def test_big_family():
] ]
) )
command = keeper_query("find_big_family test_big_family 1;") response = client.find_big_family("/test_big_family", 1)
assert command.get_answer() == TSV( assert response == TSV(
[ [
["/test_big_family/1", "5"], ["/test_big_family/1", "5"],
] ]
) )
def test_find_super_nodes(): def test_find_super_nodes(client: KeeperClient):
command = keeper_query( client.touch("/test_find_super_nodes")
"touch test_find_super_nodes;" client.touch("/test_find_super_nodes/1")
"touch test_find_super_nodes/1;" client.touch("/test_find_super_nodes/1/1")
"touch test_find_super_nodes/1/1;" client.touch("/test_find_super_nodes/1/2")
"touch test_find_super_nodes/1/2;" client.touch("/test_find_super_nodes/1/3")
"touch test_find_super_nodes/1/3;" client.touch("/test_find_super_nodes/1/4")
"touch test_find_super_nodes/1/4;" client.touch("/test_find_super_nodes/1/5")
"touch test_find_super_nodes/1/5;" client.touch("/test_find_super_nodes/2")
"touch test_find_super_nodes/2;" client.touch("/test_find_super_nodes/2/1")
"touch test_find_super_nodes/2/1;" client.touch("/test_find_super_nodes/2/2")
"touch test_find_super_nodes/2/2;" client.touch("/test_find_super_nodes/2/3")
"touch test_find_super_nodes/2/3;" client.touch("/test_find_super_nodes/2/4")
"touch test_find_super_nodes/2/4;"
"cd test_find_super_nodes;"
"find_super_nodes 4;"
)
assert command.get_answer() == TSV( client.cd("/test_find_super_nodes")
response = client.find_super_nodes(4)
assert response == TSV(
[ [
["/test_find_super_nodes/1", "5"], ["/test_find_super_nodes/1", "5"],
["/test_find_super_nodes/2", "4"], ["/test_find_super_nodes/2", "4"],
@ -106,41 +94,38 @@ def test_find_super_nodes():
) )
def test_delete_stale_backups(): def test_delete_stale_backups(client: KeeperClient):
command = keeper_query( client.touch("/clickhouse")
"touch /clickhouse;" client.touch("/clickhouse/backups")
"touch /clickhouse/backups;" client.touch("/clickhouse/backups/1")
"touch /clickhouse/backups/1;" client.touch("/clickhouse/backups/1/stage")
"touch /clickhouse/backups/1/stage;" client.touch("/clickhouse/backups/1/stage/alive123")
"touch /clickhouse/backups/1/stage/alive123;" client.touch("/clickhouse/backups/2")
"touch /clickhouse/backups/2;" client.touch("/clickhouse/backups/2/stage")
"touch /clickhouse/backups/2/stage;" client.touch("/clickhouse/backups/2/stage/dead123")
"touch /clickhouse/backups/2/stage/dead123;"
"delete_stale_backups;"
"y;"
"ls clickhouse/backups;"
)
assert command.get_answer() == ( response = client.delete_stale_backups()
"You are going to delete all inactive backups in /clickhouse/backups. Continue?\n"
assert response == (
'Found backup "/clickhouse/backups/1", checking if it\'s active\n' 'Found backup "/clickhouse/backups/1", checking if it\'s active\n'
'Backup "/clickhouse/backups/1" is active, not going to delete\n' 'Backup "/clickhouse/backups/1" is active, not going to delete\n'
'Found backup "/clickhouse/backups/2", checking if it\'s active\n' 'Found backup "/clickhouse/backups/2", checking if it\'s active\n'
'Backup "/clickhouse/backups/2" is not active, deleting it\n' 'Backup "/clickhouse/backups/2" is not active, deleting it'
"1\n"
) )
assert client.ls("/clickhouse/backups") == ["1"]
def test_base_commands():
command = keeper_query(
"create test_create_zk_node1 testvalue1;"
"create test_create_zk_node_2 testvalue2;"
"get test_create_zk_node1;"
)
assert command.get_answer() == "testvalue1\n"
def test_four_letter_word_commands(): def test_base_commands(client: KeeperClient):
command = keeper_query("ruok") client.create("/test_create_zk_node1", "testvalue1")
assert command.get_answer() == "imok\n" client.create("/test_create_zk_node_2", "testvalue2")
assert client.get("/test_create_zk_node1") == "testvalue1"
client.create("/123", "1=2")
client.create("/123/321", "'foo;bar'")
assert client.get("/123") == "1=2"
assert client.get("/123/321") == "foo;bar"
def test_four_letter_word_commands(client: KeeperClient):
assert client.execute_query("ruok") == "imok"

View File

@ -1,11 +1,10 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import pytest import pytest
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import helpers.keeper_utils as ku import helpers.keeper_utils as ku
import os import os
from kazoo.client import KazooClient import typing as tp
from kazoo.exceptions import BadArgumentsException
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs") CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
@ -19,11 +18,7 @@ part_of_cluster = "now this node is the part of cluster"
zk1, zk2, zk3 = None, None, None zk1, zk2, zk3 = None, None, None
def get_fake_zk(node): @pytest.fixture(scope="module", autouse=True)
return ku.get_fake_zk(cluster, node)
@pytest.fixture(scope="module")
def started_cluster(): def started_cluster():
try: try:
cluster.start() cluster.start()
@ -43,21 +38,28 @@ def started_cluster():
yield cluster yield cluster
finally: finally:
conn: tp.Optional[ku.KeeperClient]
for conn in [zk1, zk2, zk3]: for conn in [zk1, zk2, zk3]:
if conn: if conn is not None:
conn.stop() conn.stop()
conn.close()
cluster.shutdown() cluster.shutdown()
def test_reconfig_add(started_cluster): def create_client(node: ClickHouseInstance):
return ku.KeeperClient(
cluster.server_bin_path, cluster.get_instance_ip(node.name), 9181
)
def test_reconfig_add():
""" """
Add a node to another node. Then add another node to two. Add a node to another node. Then add another node to two.
""" """
global zk1, zk2, zk3
zk1 = create_client(node1)
zk1 = get_fake_zk(node1) config = zk1.get("/keeper/config")
config = ku.get_config_str(zk1)
print("Initial config", config) print("Initial config", config)
assert len(config.split("\n")) == 1 assert len(config.split("\n")) == 1
@ -65,24 +67,20 @@ def test_reconfig_add(started_cluster):
assert "node2" not in config assert "node2" not in config
assert "node3" not in config assert "node3" not in config
with pytest.raises(BadArgumentsException): with pytest.raises(ku.KeeperException):
# duplicate id with different endpoint # duplicate id with different endpoint
zk1.reconfig(joining="server.1=localhost:1337", leaving=None, new_members=None) zk1.reconfig(joining="server.1=localhost:1337", leaving=None, new_members=None)
with pytest.raises(BadArgumentsException): with pytest.raises(ku.KeeperException):
# duplicate endpoint # duplicate endpoint
zk1.reconfig(joining="server.8=node1:9234", leaving=None, new_members=None) zk1.reconfig(joining="server.8=node1:9234", leaving=None, new_members=None)
for i in range(100): for i in range(100):
zk1.create(f"/test_three_{i}", b"somedata") zk1.create(f"/test_three_{i}", "somedata")
node2.start_clickhouse() node2.start_clickhouse()
config, _ = zk1.reconfig( config = zk1.reconfig(joining="server.2=node2:9234", leaving=None, new_members=None)
joining="server.2=node2:9234", leaving=None, new_members=None
)
ku.wait_until_connected(cluster, node2) ku.wait_until_connected(cluster, node2)
config = config.decode("utf-8")
print("After adding 2", config) print("After adding 2", config)
assert len(config.split("\n")) == 2 assert len(config.split("\n")) == 2
@ -90,12 +88,12 @@ def test_reconfig_add(started_cluster):
assert "node2" in config assert "node2" in config
assert "node3" not in config assert "node3" not in config
zk2 = get_fake_zk(node2) zk2 = create_client(node2)
ku.wait_configs_equal(config, zk2) ku.wait_configs_equal(config, zk2)
for i in range(100): for i in range(100):
assert zk2.exists(f"/test_three_{i}") is not None assert zk2.exists(f"/test_three_{i}")
zk2.create(f"/test_three_{100 + i}", b"somedata") zk2.create(f"/test_three_{100 + i}", "somedata")
# Why not both? # Why not both?
# One node will process add_srv request, other will pull out updated config, apply # One node will process add_srv request, other will pull out updated config, apply
@ -107,23 +105,19 @@ def test_reconfig_add(started_cluster):
assert node2.contains_in_log(part_of_cluster) assert node2.contains_in_log(part_of_cluster)
zk1.stop() zk1.stop()
zk1.close() zk1 = create_client(node1)
zk1 = get_fake_zk(node1)
zk1.sync("/test_three_0") zk1.sync("/test_three_0")
for i in range(200): for i in range(200):
assert zk1.exists(f"/test_three_{i}") is not None assert zk1.exists(f"/test_three_{i}")
for i in range(100): for i in range(100):
zk2.create(f"/test_four_{i}", b"somedata") zk2.create(f"/test_four_{i}", "somedata")
node3.start_clickhouse() node3.start_clickhouse()
config, _ = zk2.reconfig( config = zk2.reconfig(joining="server.3=node3:9234", leaving=None, new_members=None)
joining="server.3=node3:9234", leaving=None, new_members=None
)
ku.wait_until_connected(cluster, node3) ku.wait_until_connected(cluster, node3)
config = config.decode("utf-8")
print("After adding 3", config) print("After adding 3", config)
assert len(config.split("\n")) == 3 assert len(config.split("\n")) == 3
@ -131,25 +125,23 @@ def test_reconfig_add(started_cluster):
assert "node2" in config assert "node2" in config
assert "node3" in config assert "node3" in config
zk3 = get_fake_zk(node3) zk3 = create_client(node3)
ku.wait_configs_equal(config, zk3) ku.wait_configs_equal(config, zk3)
for i in range(100): for i in range(100):
assert zk3.exists(f"/test_four_{i}") is not None assert zk3.exists(f"/test_four_{i}")
zk3.create(f"/test_four_{100 + i}", b"somedata") zk3.create(f"/test_four_{100 + i}", "somedata")
zk1.stop() zk1.stop()
zk1.close() zk1 = create_client(node1)
zk1 = get_fake_zk(node1)
zk1.sync("/test_four_0") zk1.sync("/test_four_0")
zk2.stop() zk2.stop()
zk2.close() zk2 = create_client(node2)
zk2 = get_fake_zk(node2)
zk2.sync("/test_four_0") zk2.sync("/test_four_0")
for i in range(200): for i in range(200):
assert zk1.exists(f"/test_four_{i}") is not None assert zk1.exists(f"/test_four_{i}")
assert zk2.exists(f"/test_four_{i}") is not None assert zk2.exists(f"/test_four_{i}")
assert node3.contains_in_log(part_of_cluster) assert node3.contains_in_log(part_of_cluster)

View File

@ -1,11 +1,11 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import subprocess
import pytest import pytest
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import helpers.keeper_utils as ku import helpers.keeper_utils as ku
import os import os
from kazoo.client import KazooClient import typing as tp
from kazoo.exceptions import BadVersionException, BadArgumentsException
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs") CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
@ -23,16 +23,18 @@ def started_cluster():
cluster.start() cluster.start()
yield cluster yield cluster
finally: finally:
conn: tp.Optional[ku.KeeperClient]
for conn in [zk1, zk2, zk3]: for conn in [zk1, zk2, zk3]:
if conn: if conn:
conn.stop() conn.stop()
conn.close()
cluster.shutdown() cluster.shutdown()
def get_fake_zk(node): def create_client(node: ClickHouseInstance):
return ku.get_fake_zk(cluster, node) return ku.KeeperClient(
cluster.server_bin_path, cluster.get_instance_ip(node.name), 9181
)
def test_reconfig_remove_followers_from_3(started_cluster): def test_reconfig_remove_followers_from_3(started_cluster):
@ -42,9 +44,9 @@ def test_reconfig_remove_followers_from_3(started_cluster):
Check that remaining node is in standalone mode. Check that remaining node is in standalone mode.
""" """
zk1 = get_fake_zk(node1) global zk1, zk2, zk3
config, _ = zk1.get("/keeper/config") zk1 = create_client(node1)
config = config.decode("utf-8") config = zk1.get("/keeper/config")
print("Initial config", config) print("Initial config", config)
assert len(config.split("\n")) == 3 assert len(config.split("\n")) == 3
@ -52,36 +54,33 @@ def test_reconfig_remove_followers_from_3(started_cluster):
assert "node2" in config assert "node2" in config
assert "node3" in config assert "node3" in config
with pytest.raises(BadVersionException): with pytest.raises(ValueError):
zk1.reconfig(joining=None, leaving="1", new_members=None, from_config=20)
with pytest.raises(BadArgumentsException):
zk1.reconfig(joining=None, leaving=None, new_members=None) zk1.reconfig(joining=None, leaving=None, new_members=None)
with pytest.raises(BadArgumentsException): with pytest.raises(ku.KeeperException):
# bulk reconfiguration is not supported # bulk reconfiguration is not supported
zk1.reconfig(joining=None, leaving=None, new_members="3") zk1.reconfig(joining=None, leaving=None, new_members="3")
with pytest.raises(BadArgumentsException): with pytest.raises(ValueError):
zk1.reconfig(joining="1", leaving="1", new_members="3") zk1.reconfig(joining="1", leaving="1", new_members="3")
with pytest.raises(BadArgumentsException): with pytest.raises(ku.KeeperException):
# at least one node must be left # at least one node must be left
zk1.reconfig(joining=None, leaving="1,2,3", new_members=None) zk1.reconfig(joining=None, leaving="1,2,3", new_members=None)
for i in range(100): for i in range(100):
zk1.create(f"/test_two_{i}", b"somedata") zk1.create(f"/test_two_{i}", "somedata")
zk2 = get_fake_zk(node2) zk2 = create_client(node2)
zk2.sync("/test_two_0") zk2.sync("/test_two_0")
ku.wait_configs_equal(config, zk2) ku.wait_configs_equal(config, zk2)
zk3 = get_fake_zk(node3) zk3 = create_client(node3)
zk3.sync("/test_two_0") zk3.sync("/test_two_0")
ku.wait_configs_equal(config, zk3) ku.wait_configs_equal(config, zk3)
for i in range(100): for i in range(100):
assert zk2.exists(f"test_two_{i}") is not None assert zk2.exists(f"test_two_{i}")
assert zk3.exists(f"test_two_{i}") is not None assert zk3.exists(f"test_two_{i}")
config, _ = zk1.reconfig(joining=None, leaving="3", new_members=None) config = zk1.reconfig(joining=None, leaving="3", new_members=None)
config = config.decode("utf-8")
print("After removing 3", config) print("After removing 3", config)
assert len(config.split("\n")) == 2 assert len(config.split("\n")) == 2
@ -90,35 +89,26 @@ def test_reconfig_remove_followers_from_3(started_cluster):
assert "node3" not in config assert "node3" not in config
zk2.stop() zk2.stop()
zk2.close() zk2 = create_client(node2)
zk2 = get_fake_zk(node2)
ku.wait_configs_equal(config, zk2) ku.wait_configs_equal(config, zk2)
for i in range(100): for i in range(100):
assert zk2.exists(f"test_two_{i}") is not None assert zk2.exists(f"test_two_{i}")
zk2.create(f"/test_two_{100 + i}", b"otherdata") zk2.create(f"/test_two_{100 + i}", "otherdata")
zk1.stop() zk1.stop()
zk1.close() zk1 = create_client(node1)
zk1 = get_fake_zk(node1)
zk1.sync("/test_two_0") zk1.sync("/test_two_0")
for i in range(200): for i in range(200):
assert zk1.exists(f"test_two_{i}") is not None assert zk1.exists(f"test_two_{i}")
with pytest.raises(Exception):
zk3.stop()
zk3.close()
zk3 = get_fake_zk(node3)
zk3.sync("/test_two_0")
assert node3.contains_in_log(log_msg_removed) assert node3.contains_in_log(log_msg_removed)
for i in range(100): for i in range(100):
zk2.create(f"/test_two_{200 + i}", b"otherdata") zk2.create(f"/test_two_{200 + i}", "otherdata")
config, _ = zk1.reconfig(joining=None, leaving="2", new_members=None) config = zk1.reconfig(joining=None, leaving="2", new_members=None)
config = config.decode("utf-8")
print("After removing 2", config) print("After removing 2", config)
assert len(config.split("\n")) == 1 assert len(config.split("\n")) == 1
@ -127,19 +117,12 @@ def test_reconfig_remove_followers_from_3(started_cluster):
assert "node3" not in config assert "node3" not in config
zk1.stop() zk1.stop()
zk1.close() zk1 = create_client(node1)
zk1 = get_fake_zk(node1)
zk1.sync("/test_two_0") zk1.sync("/test_two_0")
for i in range(300): for i in range(300):
assert zk1.exists(f"test_two_{i}") is not None assert zk1.exists(f"test_two_{i}")
with pytest.raises(Exception):
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2.sync("/test_two_0")
assert not node1.contains_in_log(log_msg_removed) assert not node1.contains_in_log(log_msg_removed)
assert node2.contains_in_log(log_msg_removed) assert node2.contains_in_log(log_msg_removed)
assert "Mode: standalone" in zk1.command(b"stat") assert "Mode: standalone" in zk1.execute_query("stat")

View File

@ -1,11 +1,11 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import pytest import pytest
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import helpers.keeper_utils as ku import helpers.keeper_utils as ku
import os import os
from kazoo.client import KazooClient, KazooState import typing as tp
from kazoo.exceptions import BadVersionException, BadArgumentsException
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs") CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
@ -26,49 +26,51 @@ def started_cluster():
cluster.start() cluster.start()
yield cluster yield cluster
finally: finally:
conn: tp.Optional[ku.KeeperClient]
for conn in [zk1, zk2, zk3, zk4, zk5]: for conn in [zk1, zk2, zk3, zk4, zk5]:
if conn: if conn:
conn.stop() conn.stop()
conn.close()
cluster.shutdown() cluster.shutdown()
def get_fake_zk(node): def create_client(node: ClickHouseInstance):
return ku.get_fake_zk(cluster, node) return ku.KeeperClient(
cluster.server_bin_path, cluster.get_instance_ip(node.name), 9181
)
def test_reconfig_remove_2_and_leader(started_cluster): def test_reconfig_remove_2_and_leader(started_cluster):
""" """
Remove 2 followers from a cluster of 5. Remove leader from 3 nodes. Remove 2 followers from a cluster of 5. Remove leader from 3 nodes.
""" """
global zk1, zk2, zk3, zk4, zk5
zk1 = get_fake_zk(node1) zk1 = create_client(node1)
config = ku.get_config_str(zk1) config = ku.get_config_str(zk1)
print("Initial config", config) print("Initial config", config)
assert len(config.split("\n")) == 5 assert len(config.split("\n")) == 5
for i in range(100): for i in range(100):
zk1.create(f"/test_two_{i}", b"somedata") zk1.create(f"/test_two_{i}", "somedata")
zk4 = get_fake_zk(node4) zk4 = create_client(node4)
zk4.sync("/test_two_0") zk4.sync("/test_two_0")
ku.wait_configs_equal(config, zk4) ku.wait_configs_equal(config, zk4)
zk5 = get_fake_zk(node5) zk5 = create_client(node5)
zk5.sync("/test_two_0") zk5.sync("/test_two_0")
ku.wait_configs_equal(config, zk5) ku.wait_configs_equal(config, zk5)
for i in range(100): for i in range(100):
assert zk4.exists(f"test_two_{i}") is not None assert zk4.exists(f"test_two_{i}")
assert zk5.exists(f"test_two_{i}") is not None assert zk5.exists(f"test_two_{i}")
zk4.create(f"/test_two_{100 + i}", b"otherdata") zk4.create(f"/test_two_{100 + i}", "otherdata")
zk2 = get_fake_zk(node2) zk2 = create_client(node2)
config, _ = zk2.reconfig(joining=None, leaving="4,5", new_members=None) config = zk2.reconfig(joining=None, leaving="4,5", new_members=None)
config = config.decode("utf-8")
print("After removing 4,5", config) print("After removing 4,5", config)
assert len(config.split("\n")) == 3 assert len(config.split("\n")) == 3
@ -79,27 +81,14 @@ def test_reconfig_remove_2_and_leader(started_cluster):
assert "node5" not in config assert "node5" not in config
zk1.stop() zk1.stop()
zk1.close() zk1 = create_client(node1)
zk1 = get_fake_zk(node1)
zk1.sync("/test_two_0") zk1.sync("/test_two_0")
ku.wait_configs_equal(config, zk1) ku.wait_configs_equal(config, zk1)
for i in range(200): for i in range(200):
assert zk1.exists(f"test_two_{i}") is not None assert zk1.exists(f"test_two_{i}")
assert zk2.exists(f"test_two_{i}") is not None assert zk2.exists(f"test_two_{i}")
with pytest.raises(Exception):
zk4.stop()
zk4.close()
zk4 = get_fake_zk(node4)
zk4.sync("/test_two_0")
with pytest.raises(Exception):
zk5.stop()
zk5.close()
zk5 = get_fake_zk(node5)
zk5.sync("/test_two_0")
assert not node1.contains_in_log(log_msg_removed) assert not node1.contains_in_log(log_msg_removed)
assert not node2.contains_in_log(log_msg_removed) assert not node2.contains_in_log(log_msg_removed)
@ -110,11 +99,10 @@ def test_reconfig_remove_2_and_leader(started_cluster):
assert ku.is_leader(cluster, node1) assert ku.is_leader(cluster, node1)
for i in range(100): for i in range(100):
zk1.create(f"/test_leader_{i}", b"somedata") zk1.create(f"/test_leader_{i}", "somedata")
# when a leader gets a remove request, it must yield leadership # when a leader gets a remove request, it must yield leadership
config, _ = zk1.reconfig(joining=None, leaving="1", new_members=None) config = zk1.reconfig(joining=None, leaving="1", new_members=None)
config = config.decode("utf-8")
print("After removing 1 (leader)", config) print("After removing 1 (leader)", config)
assert len(config.split("\n")) == 2 assert len(config.split("\n")) == 2
@ -125,24 +113,17 @@ def test_reconfig_remove_2_and_leader(started_cluster):
assert "node5" not in config assert "node5" not in config
zk2.stop() zk2.stop()
zk2.close() zk2 = create_client(node2)
zk2 = get_fake_zk(node2)
zk2.sync("/test_leader_0") zk2.sync("/test_leader_0")
ku.wait_configs_equal(config, zk2) ku.wait_configs_equal(config, zk2)
zk3 = get_fake_zk(node3) zk3 = create_client(node3)
zk3.sync("/test_leader_0") zk3.sync("/test_leader_0")
ku.wait_configs_equal(config, zk3) ku.wait_configs_equal(config, zk3)
for i in range(100): for i in range(100):
assert zk2.exists(f"test_leader_{i}") is not None assert zk2.exists(f"test_leader_{i}")
assert zk3.exists(f"test_leader_{i}") is not None assert zk3.exists(f"test_leader_{i}")
with pytest.raises(Exception):
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1.sync("/test_leader_0")
assert node1.contains_in_log(log_msg_removed) assert node1.contains_in_log(log_msg_removed)
assert not node2.contains_in_log(log_msg_removed) assert not node2.contains_in_log(log_msg_removed)

View File

@ -1,11 +1,10 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import pytest import pytest
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from os.path import join, dirname, realpath from os.path import join, dirname, realpath
import time
import helpers.keeper_utils as ku import helpers.keeper_utils as ku
from kazoo.client import KazooClient, KazooState import typing as tp
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
CONFIG_DIR = join(dirname(realpath(__file__)), "configs") CONFIG_DIR = join(dirname(realpath(__file__)), "configs")
@ -31,24 +30,26 @@ def started_cluster():
yield cluster yield cluster
finally: finally:
conn: tp.Optional[ku.KeeperClient]
for conn in [zk1, zk2, zk3, zk4]: for conn in [zk1, zk2, zk3, zk4]:
if conn: if conn:
conn.stop() conn.stop()
conn.close()
cluster.shutdown() cluster.shutdown()
def get_fake_zk(node): def create_client(node: ClickHouseInstance):
return ku.get_fake_zk(cluster, node) return ku.KeeperClient(
cluster.server_bin_path, cluster.get_instance_ip(node.name), 9181
)
def test_reconfig_replace_leader(started_cluster): def test_reconfig_replace_leader(started_cluster):
""" """
Remove leader from a cluster of 3 and add a new node via two commands. Remove leader from a cluster of 3 and add a new node via two commands.
""" """
global zk1, zk2, zk3, zk4
zk1 = get_fake_zk(node1) zk1 = create_client(node1)
config = ku.get_config_str(zk1) config = ku.get_config_str(zk1)
assert len(config.split("\n")) == 3 assert len(config.split("\n")) == 3
@ -58,23 +59,22 @@ def test_reconfig_replace_leader(started_cluster):
assert "node4" not in config assert "node4" not in config
for i in range(100): for i in range(100):
zk1.create(f"/test_four_{i}", b"somedata") zk1.create(f"/test_four_{i}", "somedata")
zk2 = get_fake_zk(node2) zk2 = create_client(node2)
zk2.sync("/test_four_0") zk2.sync("/test_four_0")
ku.wait_configs_equal(config, zk2) ku.wait_configs_equal(config, zk2)
zk3 = get_fake_zk(node3) zk3 = create_client(node3)
zk3.sync("/test_four_0") zk3.sync("/test_four_0")
ku.wait_configs_equal(config, zk3) ku.wait_configs_equal(config, zk3)
for i in range(100): for i in range(100):
assert zk2.exists(f"/test_four_{i}") is not None assert zk2.exists(f"/test_four_{i}")
assert zk3.exists(f"/test_four_{i}") is not None assert zk3.exists(f"/test_four_{i}")
assert ku.is_leader(cluster, node1) assert ku.is_leader(cluster, node1)
config, _ = zk2.reconfig(joining=None, leaving="1", new_members=None) config = zk2.reconfig(joining=None, leaving="1", new_members=None)
config = config.decode("utf-8")
print("After removing 1 (leader)", config) print("After removing 1 (leader)", config)
assert len(config.split("\n")) == 2 assert len(config.split("\n")) == 2
@ -85,17 +85,8 @@ def test_reconfig_replace_leader(started_cluster):
ku.wait_configs_equal(config, zk2) ku.wait_configs_equal(config, zk2)
with pytest.raises(Exception):
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1.sync("/test_four_0")
node4.start_clickhouse() node4.start_clickhouse()
config, _ = zk2.reconfig( config = zk2.reconfig(joining="server.4=node4:9234", leaving=None, new_members=None)
joining="server.4=node4:9234", leaving=None, new_members=None
)
config = config.decode("utf-8")
ku.wait_until_connected(cluster, node4) ku.wait_until_connected(cluster, node4)
print("After adding 4", config) print("After adding 4", config)
@ -105,22 +96,20 @@ def test_reconfig_replace_leader(started_cluster):
assert "node3" in config assert "node3" in config
assert "node4" in config assert "node4" in config
zk4 = get_fake_zk(node4) zk4 = create_client(node4)
ku.wait_configs_equal(config, zk4) ku.wait_configs_equal(config, zk4)
for i in range(100): for i in range(100):
assert zk4.exists(f"test_four_{i}") is not None assert zk4.exists(f"test_four_{i}")
zk4.create(f"/test_four_{100 + i}", b"somedata") zk4.create(f"/test_four_{100 + i}", "somedata")
zk2.stop() zk2.stop()
zk2.close() zk2 = create_client(node2)
zk2 = get_fake_zk(node2)
zk2.sync("/test_four_0") zk2.sync("/test_four_0")
ku.wait_configs_equal(config, zk2) ku.wait_configs_equal(config, zk2)
zk3.stop() zk3.stop()
zk3.close() zk3 = create_client(node3)
zk3 = get_fake_zk(node3)
zk3.sync("/test_four_0") zk3.sync("/test_four_0")
ku.wait_configs_equal(config, zk3) ku.wait_configs_equal(config, zk3)

View File

@ -1,35 +0,0 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -1,35 +0,0 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -1,35 +0,0 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -1,21 +0,0 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>4</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server> <id>2</id> <hostname>node2</hostname> <port>9234</port> </server>
<server> <id>3</id> <hostname>node3</hostname> <port>9234</port> </server>
<server> <id>4</id> <hostname>node4</hostname> <port>9234</port> </server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -1,43 +0,0 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from os.path import join, dirname, realpath
import time
import helpers.keeper_utils as ku
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = join(dirname(realpath(__file__)), "configs")
node1 = cluster.add_instance("node1", main_configs=["configs/keeper1.xml"])
node2 = cluster.add_instance("node2", main_configs=["configs/keeper2.xml"])
node3 = cluster.add_instance("node3", main_configs=["configs/keeper3.xml"])
node4 = cluster.add_instance("node4", stay_alive=True)
zk1, zk2, zk3, zk4 = None, None, None, None
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node4.stop_clickhouse()
node4.copy_file_to_container(
join(CONFIG_DIR, "keeper4.xml"),
"/etc/clickhouse-server/config.d/keeper.xml",
)
yield cluster
finally:
for conn in [zk1, zk2, zk3, zk4]:
if conn:
conn.stop()
conn.close()
cluster.shutdown()
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)