Merge pull request #51117 from ClickHouse/pufit/keeper-client-improvements

Implementing new commands for keeper-client
This commit is contained in:
Yarik Briukhovetskyi 2023-08-01 17:47:33 +02:00 committed by GitHub
commit 104a4aaee5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 365 additions and 36 deletions

View File

@ -51,3 +51,7 @@ keeper foo bar
- `rmr <path>` -- Recursively deletes path. Confirmation required
- `flwc <command>` -- Executes four-letter-word command
- `help` -- Prints this message
- `get_stat [path]` -- Returns the node's stat (default `.`)
- `find_super_nodes <threshold> [path]` -- Finds nodes with number of children larger than some threshold for the given path (default `.`)
- `delete_stable_backups` -- Deletes ClickHouse nodes used for backups that are now inactive
- `find_big_family [path] [n]` -- Returns the top n nodes with the biggest family in the subtree (default path = `.` and n = 10)

View File

@ -1,5 +1,6 @@
#include "Commands.h"
#include <queue>
#include "KeeperClient.h"
@ -24,8 +25,18 @@ void LSCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) con
else
path = client->cwd;
for (const auto & child : client->zookeeper->getChildren(path))
std::cout << child << " ";
auto children = client->zookeeper->getChildren(path);
std::sort(children.begin(), children.end());
bool need_space = false;
for (const auto & child : children)
{
if (std::exchange(need_space, true))
std::cout << " ";
std::cout << child;
}
std::cout << "\n";
}
@ -130,6 +141,173 @@ void GetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co
std::cout << client->zookeeper->get(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
}
bool GetStatCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
if (!parseKeeperPath(pos, expected, arg))
return true;
node->args.push_back(std::move(arg));
return true;
}
void GetStatCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
Coordination::Stat stat;
String path;
if (!query->args.empty())
path = client->getAbsolutePath(query->args[0].safeGet<String>());
else
path = client->cwd;
client->zookeeper->get(path, &stat);
std::cout << "cZxid = " << stat.czxid << "\n";
std::cout << "mZxid = " << stat.mzxid << "\n";
std::cout << "pZxid = " << stat.pzxid << "\n";
std::cout << "ctime = " << stat.ctime << "\n";
std::cout << "mtime = " << stat.mtime << "\n";
std::cout << "version = " << stat.version << "\n";
std::cout << "cversion = " << stat.cversion << "\n";
std::cout << "aversion = " << stat.aversion << "\n";
std::cout << "ephemeralOwner = " << stat.ephemeralOwner << "\n";
std::cout << "dataLength = " << stat.dataLength << "\n";
std::cout << "numChildren = " << stat.numChildren << "\n";
}
bool FindSuperNodes::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
ASTPtr threshold;
if (!ParserUnsignedInteger{}.parse(pos, threshold, expected))
return false;
node->args.push_back(threshold->as<ASTLiteral &>().value);
String path;
if (!parseKeeperPath(pos, expected, path))
path = ".";
node->args.push_back(std::move(path));
return true;
}
void FindSuperNodes::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
auto threshold = query->args[0].safeGet<UInt64>();
auto path = client->getAbsolutePath(query->args[1].safeGet<String>());
Coordination::Stat stat;
client->zookeeper->get(path, &stat);
if (stat.numChildren >= static_cast<Int32>(threshold))
{
std::cout << static_cast<String>(path) << "\t" << stat.numChildren << "\n";
return;
}
auto children = client->zookeeper->getChildren(path);
std::sort(children.begin(), children.end());
for (const auto & child : children)
{
auto next_query = *query;
next_query.args[1] = DB::Field(path / child);
execute(&next_query, client);
}
}
bool DeleteStableBackups::parse(IParser::Pos & /* pos */, std::shared_ptr<ASTKeeperQuery> & /* node */, Expected & /* expected */) const
{
return true;
}
void DeleteStableBackups::execute(const ASTKeeperQuery * /* query */, KeeperClient * client) const
{
client->askConfirmation(
"You are going to delete all inactive backups in /clickhouse/backups.",
[client]
{
fs::path backup_root = "/clickhouse/backups";
auto backups = client->zookeeper->getChildren(backup_root);
std::sort(backups.begin(), backups.end());
for (const auto & child : backups)
{
auto backup_path = backup_root / child;
std::cout << "Found backup " << backup_path << ", checking if it's active\n";
String stage_path = backup_path / "stage";
auto stages = client->zookeeper->getChildren(stage_path);
bool is_active = false;
for (const auto & stage : stages)
{
if (startsWith(stage, "alive"))
{
is_active = true;
break;
}
}
if (is_active)
{
std::cout << "Backup " << backup_path << " is active, not going to delete\n";
continue;
}
std::cout << "Backup " << backup_path << " is not active, deleting it\n";
client->zookeeper->removeRecursive(backup_path);
}
});
}
bool FindBigFamily::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String path;
if (!parseKeeperPath(pos, expected, path))
path = ".";
node->args.push_back(std::move(path));
ASTPtr count;
if (ParserUnsignedInteger{}.parse(pos, count, expected))
node->args.push_back(count->as<ASTLiteral &>().value);
else
node->args.push_back(UInt64(10));
return true;
}
void FindBigFamily::execute(const ASTKeeperQuery * query, KeeperClient * client) const
{
auto path = client->getAbsolutePath(query->args[0].safeGet<String>());
auto n = query->args[1].safeGet<UInt64>();
std::vector<std::tuple<Int32, String>> result;
std::queue<fs::path> queue;
queue.push(path);
while (!queue.empty())
{
auto next_path = queue.front();
queue.pop();
auto children = client->zookeeper->getChildren(next_path);
std::transform(children.cbegin(), children.cend(), children.begin(), [&](const String & child) { return next_path / child; });
auto response = client->zookeeper->get(children);
for (size_t i = 0; i < response.size(); ++i)
{
result.emplace_back(response[i].stat.numChildren, children[i]);
queue.push(children[i]);
}
}
std::sort(result.begin(), result.end(), std::greater());
for (UInt64 i = 0; i < std::min(result.size(), static_cast<size_t>(n)); ++i)
std::cout << std::get<1>(result[i]) << "\t" << std::get<0>(result[i]) << "\n";
}
bool RMCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String arg;
@ -170,7 +348,7 @@ bool HelpCommand::parse(IParser::Pos & /* pos */, std::shared_ptr<ASTKeeperQuery
void HelpCommand::execute(const ASTKeeperQuery * /* query */, KeeperClient * /* client */) const
{
for (const auto & pair : KeeperClient::commands)
std::cout << pair.second->getHelpMessage() << "\n";
std::cout << pair.second->generateHelpString() << "\n";
}
bool FourLetterWordCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const

View File

@ -21,6 +21,12 @@ public:
virtual String getName() const = 0;
virtual ~IKeeperClientCommand() = default;
String generateHelpString() const
{
return fmt::vformat(getHelpMessage(), fmt::make_format_args(getName()));
}
};
using Command = std::shared_ptr<IKeeperClientCommand>;
@ -34,7 +40,7 @@ class LSCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "ls [path] -- Lists the nodes for the given path (default: cwd)"; }
String getHelpMessage() const override { return "{} [path] -- Lists the nodes for the given path (default: cwd)"; }
};
class CDCommand : public IKeeperClientCommand
@ -45,7 +51,7 @@ class CDCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "cd [path] -- Change the working path (default `.`)"; }
String getHelpMessage() const override { return "{} [path] -- Change the working path (default `.`)"; }
};
class SetCommand : public IKeeperClientCommand
@ -58,7 +64,7 @@ class SetCommand : public IKeeperClientCommand
String getHelpMessage() const override
{
return "set <path> <value> [version] -- Updates the node's value. Only update if version matches (default: -1)";
return "{} <path> <value> [version] -- Updates the node's value. Only update if version matches (default: -1)";
}
};
@ -70,7 +76,7 @@ class CreateCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "create <path> <value> -- Creates new node"; }
String getHelpMessage() const override { return "{} <path> <value> -- Creates new node"; }
};
class GetCommand : public IKeeperClientCommand
@ -81,9 +87,63 @@ class GetCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "get <path> -- Returns the node's value"; }
String getHelpMessage() const override { return "{} <path> -- Returns the node's value"; }
};
class GetStatCommand : public IKeeperClientCommand
{
String getName() const override { return "get_stat"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} [path] -- Returns the node's stat (default `.`)"; }
};
class FindSuperNodes : public IKeeperClientCommand
{
String getName() const override { return "find_super_nodes"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override
{
return "{} <threshold> [path] -- Finds nodes with number of children larger than some threshold for the given path (default `.`)";
}
};
class DeleteStableBackups : public IKeeperClientCommand
{
String getName() const override { return "delete_stable_backups"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override
{
return "{} -- Deletes ClickHouse nodes used for backups that are now inactive";
}
};
class FindBigFamily : public IKeeperClientCommand
{
String getName() const override { return "find_big_family"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override
{
return "{} [path] [n] -- Returns the top n nodes with the biggest family in the subtree (default path = `.` and n = 10)";
}
};
class RMCommand : public IKeeperClientCommand
{
String getName() const override { return "rm"; }
@ -92,7 +152,7 @@ class RMCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "remove <path> -- Remove the node"; }
String getHelpMessage() const override { return "{} <path> -- Remove the node"; }
};
class RMRCommand : public IKeeperClientCommand
@ -103,7 +163,7 @@ class RMRCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "rmr <path> -- Recursively deletes path. Confirmation required"; }
String getHelpMessage() const override { return "{} <path> -- Recursively deletes path. Confirmation required"; }
};
class HelpCommand : public IKeeperClientCommand
@ -114,7 +174,7 @@ class HelpCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "help -- Prints this message"; }
String getHelpMessage() const override { return "{} -- Prints this message"; }
};
class FourLetterWordCommand : public IKeeperClientCommand
@ -125,7 +185,7 @@ class FourLetterWordCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "flwc <command> -- Executes four-letter-word command"; }
String getHelpMessage() const override { return "{} <command> -- Executes four-letter-word command"; }
};
}

View File

@ -177,6 +177,10 @@ void KeeperClient::initialize(Poco::Util::Application & /* self */)
std::make_shared<SetCommand>(),
std::make_shared<CreateCommand>(),
std::make_shared<GetCommand>(),
std::make_shared<GetStatCommand>(),
std::make_shared<FindSuperNodes>(),
std::make_shared<DeleteStableBackups>(),
std::make_shared<FindBigFamily>(),
std::make_shared<RMCommand>(),
std::make_shared<RMRCommand>(),
std::make_shared<HelpCommand>(),

View File

@ -58,6 +58,7 @@ bool KeeperParser::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
String command_name(pos->begin, pos->end);
std::transform(command_name.begin(), command_name.end(), command_name.begin(), [](unsigned char c) { return std::tolower(c); });
Command command;
auto iter = KeeperClient::commands.find(command_name);

View File

@ -1,6 +1,7 @@
import pytest
from helpers.client import CommandRequest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
@ -13,7 +14,7 @@ node = cluster.add_instance(
)
@pytest.fixture(scope="module")
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
@ -23,41 +24,122 @@ def started_cluster():
cluster.shutdown()
def test_base_commands(started_cluster):
_ = started_cluster
command = CommandRequest(
def keeper_query(query: str):
return CommandRequest(
[
started_cluster.server_bin_path,
cluster.server_bin_path,
"keeper-client",
"--host",
str(cluster.get_instance_ip("zoo1")),
"--port",
str(cluster.zookeeper_port),
"-q",
"create test_create_zk_node1 testvalue1;create test_create_zk_node_2 testvalue2;get test_create_zk_node1;",
query,
],
stdin="",
)
def test_big_family():
command = keeper_query(
"create test_big_family foo;"
"create test_big_family/1 foo;"
"create test_big_family/1/1 foo;"
"create test_big_family/1/2 foo;"
"create test_big_family/1/3 foo;"
"create test_big_family/1/4 foo;"
"create test_big_family/1/5 foo;"
"create test_big_family/2 foo;"
"create test_big_family/2/1 foo;"
"create test_big_family/2/2 foo;"
"create test_big_family/2/3 foo;"
"find_big_family test_big_family;"
)
assert command.get_answer() == TSV(
[
["/test_big_family/1", "5"],
["/test_big_family/2", "3"],
["/test_big_family/2/3", "0"],
["/test_big_family/2/2", "0"],
["/test_big_family/2/1", "0"],
["/test_big_family/1/5", "0"],
["/test_big_family/1/4", "0"],
["/test_big_family/1/3", "0"],
["/test_big_family/1/2", "0"],
["/test_big_family/1/1", "0"],
]
)
command = keeper_query("find_big_family test_big_family 1;")
assert command.get_answer() == TSV(
[
["/test_big_family/1", "5"],
]
)
def test_find_super_nodes():
command = keeper_query(
"create test_find_super_nodes foo;"
"create test_find_super_nodes/1 foo;"
"create test_find_super_nodes/1/1 foo;"
"create test_find_super_nodes/1/2 foo;"
"create test_find_super_nodes/1/3 foo;"
"create test_find_super_nodes/1/4 foo;"
"create test_find_super_nodes/1/5 foo;"
"create test_find_super_nodes/2 foo;"
"create test_find_super_nodes/2/1 foo;"
"create test_find_super_nodes/2/2 foo;"
"create test_find_super_nodes/2/3 foo;"
"create test_find_super_nodes/2/4 foo;"
"cd test_find_super_nodes;"
"find_super_nodes 4;"
)
assert command.get_answer() == TSV(
[
["/test_find_super_nodes/1", "5"],
["/test_find_super_nodes/2", "4"],
]
)
def test_delete_stable_backups():
command = keeper_query(
"create /clickhouse/backups foo;"
"create /clickhouse/backups/1 foo;"
"create /clickhouse/backups/1/stage foo;"
"create /clickhouse/backups/1/stage/alive123 foo;"
"create /clickhouse/backups/2 foo;"
"create /clickhouse/backups/2/stage foo;"
"create /clickhouse/backups/2/stage/dead123 foo;"
"delete_stable_backups;"
"y;"
"ls clickhouse/backups;"
)
assert command.get_answer() == (
"You are going to delete all inactive backups in /clickhouse/backups. Continue?\n"
'Found backup "/clickhouse/backups/1", checking if it\'s active\n'
'Backup "/clickhouse/backups/1" is active, not going to delete\n'
'Found backup "/clickhouse/backups/2", checking if it\'s active\n'
'Backup "/clickhouse/backups/2" is not active, deleting it\n'
"1\n"
)
def test_base_commands():
command = keeper_query(
"create test_create_zk_node1 testvalue1;"
"create test_create_zk_node_2 testvalue2;"
"get test_create_zk_node1;"
)
assert command.get_answer() == "testvalue1\n"
def test_four_letter_word_commands(started_cluster):
_ = started_cluster
command = CommandRequest(
[
started_cluster.server_bin_path,
"keeper-client",
"--host",
str(cluster.get_instance_ip("zoo1")),
"--port",
str(cluster.zookeeper_port),
"-q",
"ruok",
],
stdin="",
)
def test_four_letter_word_commands():
command = keeper_query("ruok")
assert command.get_answer() == "imok\n"