Implementing exists, sync commands. Better tests

This commit is contained in:
pufit 2023-09-06 20:36:39 -04:00
parent ed43a8f1f6
commit ef2350cfc7
17 changed files with 430 additions and 419 deletions

View File

@ -16,6 +16,8 @@ A client application to interact with clickhouse-keeper by its native protocol.
- `--session-timeout=TIMEOUT` — Set session timeout in seconds. Default value: 10s.
- `--operation-timeout=TIMEOUT` — Set operation timeout in seconds. Default value: 10s.
- `--history-file=FILE_PATH` — Set path of history file. Default value: `~/.keeper-client-history`.
- `--log-level=LEVEL` — Set log level. Default value: `information`.
- `--no-confirmation` — If set, will not require a confirmation on several commands. Default value `false` for interactive and `true` for query
- `--help` — Shows the help message.
## Example {#clickhouse-keeper-client-example}
@ -44,6 +46,7 @@ keeper foo bar
- `ls [path]` -- Lists the nodes for the given path (default: cwd)
- `cd [path]` -- Change the working path (default `.`)
- `exists <path>` -- Returns `1` if node exists, `0` otherwise
- `set <path> <value> [version]` -- Updates the node's value. Only update if version matches (default: -1)
- `create <path> <value> [mode]` -- Creates new node with the set value
- `touch <path>` -- Creates new node with an empty string as value. Doesn't throw an exception if the node already exists
@ -56,3 +59,5 @@ keeper foo bar
- `find_super_nodes <threshold> [path]` -- Finds nodes with number of children larger than some threshold for the given path (default `.`)
- `delete_stale_backups` -- Deletes ClickHouse nodes used for backups that are now inactive
- `find_big_family [path] [n]` -- Returns the top n nodes with the biggest family in the subtree (default path = `.` and n = 10)
- `sync <path>` -- Synchronizes node between processes and leader
- `reconfig <add|remove|set> "<arg>" [version]` -- Reconfigures Keeper cluster. See https://clickhouse.com/docs/en/guides/sre/keeper/clickhouse-keeper#reconfiguration

View File

@ -158,6 +158,21 @@ void GetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co
std::cout << client->zookeeper->get(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
}
bool ExistsCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, DB::Expected & expected) const
{
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(path));
return true;
}
void ExistsCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient * client) const
{
std::cout << client->zookeeper->exists(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
}
bool GetStatCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const
{
String path;
@ -400,6 +415,21 @@ void ReconfigCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient
std::cout << response.value << '\n';
}
bool SyncCommand::parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, DB::Expected & expected) const
{
String path;
if (!parseKeeperPath(pos, expected, path))
return false;
node->args.push_back(std::move(path));
return true;
}
void SyncCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient * client) const
{
std::cout << client->zookeeper->sync(client->getAbsolutePath(query->args[0].safeGet<String>())) << "\n";
}
bool HelpCommand::parse(IParser::Pos & /* pos */, std::shared_ptr<ASTKeeperQuery> & /* node */, Expected & /* expected */) const
{
return true;

View File

@ -101,6 +101,17 @@ class GetCommand : public IKeeperClientCommand
String getHelpMessage() const override { return "{} <path> -- Returns the node's value"; }
};
class ExistsCommand : public IKeeperClientCommand
{
String getName() const override { return "exists"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> -- Returns `1` if node exists, `0` otherwise"; }
};
class GetStatCommand : public IKeeperClientCommand
{
String getName() const override { return "get_stat"; }
@ -179,7 +190,8 @@ class RMRCommand : public IKeeperClientCommand
class ReconfigCommand : public IKeeperClientCommand
{
enum class Operation : Int64 {
enum class Operation : Int64
{
ADD = 0,
REMOVE = 1,
SET = 2,
@ -191,7 +203,18 @@ class ReconfigCommand : public IKeeperClientCommand
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <add|remove|set> \"<arg>\" [version] -- Reconfigures a ZooKeeper cluster. See https://clickhouse.com/docs/en/guides/sre/keeper/clickhouse-keeper#reconfiguration"; }
String getHelpMessage() const override { return "{} <add|remove|set> \"<arg>\" [version] -- Reconfigures Keeper cluster. See https://clickhouse.com/docs/en/guides/sre/keeper/clickhouse-keeper#reconfiguration"; }
};
class SyncCommand: public IKeeperClientCommand
{
String getName() const override { return "sync"; }
bool parse(IParser::Pos & pos, std::shared_ptr<ASTKeeperQuery> & node, Expected & expected) const override;
void execute(const ASTKeeperQuery * query, KeeperClient * client) const override;
String getHelpMessage() const override { return "{} <path> -- Synchronizes node between processes and leader"; }
};
class HelpCommand : public IKeeperClientCommand

View File

@ -173,6 +173,14 @@ void KeeperClient::defineOptions(Poco::Util::OptionSet & options)
Poco::Util::Option("log-level", "", "set log level")
.argument("<level>")
.binding("log-level"));
options.addOption(
Poco::Util::Option("no-confirmation", "", "if set, will not require a confirmation on several commands. default false for interactive and true for query")
.binding("no-confirmation"));
options.addOption(
Poco::Util::Option("tests-mode", "", "run keeper-client in a special mode for tests. all commands output are separated by special symbols. default false")
.binding("tests-mode"));
}
void KeeperClient::initialize(Poco::Util::Application & /* self */)
@ -187,6 +195,7 @@ void KeeperClient::initialize(Poco::Util::Application & /* self */)
std::make_shared<CreateCommand>(),
std::make_shared<TouchCommand>(),
std::make_shared<GetCommand>(),
std::make_shared<ExistsCommand>(),
std::make_shared<GetStatCommand>(),
std::make_shared<FindSuperNodes>(),
std::make_shared<DeleteStaleBackups>(),
@ -194,6 +203,7 @@ void KeeperClient::initialize(Poco::Util::Application & /* self */)
std::make_shared<RMCommand>(),
std::make_shared<RMRCommand>(),
std::make_shared<ReconfigCommand>(),
std::make_shared<SyncCommand>(),
std::make_shared<HelpCommand>(),
std::make_shared<FourLetterWordCommand>(),
});
@ -233,7 +243,6 @@ void KeeperClient::initialize(Poco::Util::Application & /* self */)
EventNotifier::init();
}
bool KeeperClient::processQueryText(const String & text)
{
if (exit_strings.find(text) != exit_strings.end())
@ -287,7 +296,7 @@ bool KeeperClient::processQueryText(const String & text)
return true;
}
void KeeperClient::runInteractive()
void KeeperClient::runInteractiveReplxx()
{
LineReader::Patterns query_extenders = {"\\"};
@ -321,6 +330,26 @@ void KeeperClient::runInteractive()
}
}
void KeeperClient::runInteractiveInputStream()
{
for (String input; std::getline(std::cin, input);)
{
if (!processQueryText(input))
break;
std::cout << "\a\a\a\a" << std::endl;
std::cerr << std::flush;
}
}
void KeeperClient::runInteractive()
{
if (config().hasOption("tests-mode"))
runInteractiveInputStream();
else
runInteractiveReplxx();
}
int KeeperClient::main(const std::vector<String> & /* args */)
{
if (config().hasOption("help"))
@ -370,9 +399,11 @@ int KeeperClient::main(const std::vector<String> & /* args */)
zk_args.operation_timeout_ms = config().getInt("operation-timeout", 10) * 1000;
zookeeper = std::make_unique<zkutil::ZooKeeper>(zk_args);
if (config().has("no-confirmation") || config().has("query"))
ask_confirmation = false;
if (config().has("query"))
{
ask_confirmation = false;
processQueryText(config().getString("query"));
}
else

View File

@ -49,6 +49,9 @@ public:
protected:
void runInteractive();
void runInteractiveReplxx();
void runInteractiveInputStream();
bool processQueryText(const String & text);
void loadCommands(std::vector<Command> && new_commands);

View File

@ -1,6 +1,164 @@
import io
import subprocess
import socket
import time
import typing as tp
import contextlib
import select
from kazoo.client import KazooClient
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.client import CommandRequest
def execute_keeper_client_query(cluster: ClickHouseCluster, node: ClickHouseInstance, query: str) -> str:
request = CommandRequest(
[
cluster.server_bin_path,
"keeper-client",
"--host",
str(cluster.get_instance_ip(node.name)),
"--port",
str(cluster.zookeeper_port),
"-q",
query,
],
stdin="",
)
return request.get_answer()
class KeeperException(Exception):
pass
class KeeperClient(object):
SEPARATOR = b'\a\a\a\a\n'
def __init__(self, bin_path: str, host: str, port: int):
self.bin_path = bin_path
self.host = host
self.port = port
self.proc = subprocess.Popen(
[
bin_path,
'keeper-client',
'--host',
host,
'--port',
str(port),
'--log-level',
'error',
'--tests-mode',
'--no-confirmation',
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
self.poller = select.epoll()
self.poller.register(self.proc.stdout)
self.poller.register(self.proc.stderr)
self._fd_nums = {
self.proc.stdout.fileno(): self.proc.stdout,
self.proc.stderr.fileno(): self.proc.stderr,
}
self.stopped = False
def execute_query(self, query: str, timeout: float = 10.) -> str:
output = io.BytesIO()
self.proc.stdin.write(query.encode() + b'\n')
self.proc.stdin.flush()
events = self.poller.poll(timeout)
for fd_num, event in events:
if event & (select.EPOLLIN | select.EPOLLPRI):
file = self._fd_nums[fd_num]
if file == self.proc.stdout:
while True:
chunk = file.readline()
if chunk.endswith(self.SEPARATOR):
break
output.write(chunk)
elif file == self.proc.stderr:
assert self.proc.stdout.readline() == self.SEPARATOR
raise KeeperException(self.proc.stderr.readline().strip().decode())
else:
raise ValueError(f'Failed to read from pipe. Flag {event}')
data = output.getvalue().strip().decode()
return data
def cd(self, path: str, timeout: float = 10.):
self.execute_query(f'cd {path}', timeout)
def ls(self, path: str, timeout: float = 10.) -> list[str]:
return self.execute_query(f'ls {path}', timeout).split(' ')
def create(self, path: str, value: str, timeout: float = 10.):
self.execute_query(f'create {path} {value}', timeout)
def get(self, path: str, timeout: float = 10.) -> str:
return self.execute_query(f'get {path}', timeout)
def exists(self, path: str, timeout: float = 10.) -> bool:
return bool(int(self.execute_query(f'exists {path}', timeout)))
def stop(self):
if not self.stopped:
self.stopped = True
self.proc.communicate(b'exit\n', timeout=10.)
def sync(self, path: str, timeout: float = 10.):
self.execute_query(f'sync {path}', timeout)
def touch(self, path: str, timeout: float = 10.):
self.execute_query(f'touch {path}', timeout)
def find_big_family(self, path: str, n: int = 10, timeout: float = 10.) -> str:
return self.execute_query(f'find_big_family {path} {n}', timeout)
def find_super_nodes(self, threshold: int, timeout: float = 10.) -> str:
return self.execute_query(f'find_super_nodes {threshold}', timeout)
def delete_stale_backups(self, timeout: float = 10.) -> str:
return self.execute_query('delete_stale_backups', timeout)
def reconfig(self, joining: tp.Optional[str], leaving: tp.Optional[str], new_members: tp.Optional[str], timeout: float = 10.) -> str:
if bool(joining) + bool(leaving) + bool(new_members) != 1:
raise ValueError('Exactly one of joining, leaving or new_members must be specified')
if joining is not None:
operation = 'add'
elif leaving is not None:
operation = 'remove'
elif new_members is not None:
operation = 'set'
else:
raise ValueError('At least one of joining, leaving or new_members must be specified')
return self.execute_query(f'reconfig {operation} {joining or leaving or new_members}', timeout)
@classmethod
@contextlib.contextmanager
def from_cluster(cls, cluster: ClickHouseCluster, keeper_node: str, port: tp.Optional[int] = None) -> 'KeeperClient':
client = cls(cluster.server_bin_path, cluster.get_instance_ip(keeper_node), port or cluster.zookeeper_port)
try:
yield client
finally:
client.stop()
def get_keeper_socket(cluster, node, port=9181):
@ -70,14 +228,14 @@ def get_fake_zk(cluster, node, timeout: float = 30.0) -> KazooClient:
return _fake
def get_config_str(zk: KazooClient) -> str:
def get_config_str(zk: KeeperClient) -> str:
"""
Return decoded contents of /keeper/config node
"""
return zk.get("/keeper/config")[0].decode("utf-8")
return zk.get("/keeper/config")
def wait_configs_equal(left_config: str, right_zk: KazooClient, timeout: float = 30.0):
def wait_configs_equal(left_config: str, right_zk: KeeperClient, timeout: float = 30.0):
"""
Check whether get /keeper/config result in left_config is equal
to get /keeper/config on right_zk ZK connection.

View File

@ -1,7 +1,7 @@
import pytest
from helpers.client import CommandRequest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
from helpers.keeper_utils import KeeperClient
cluster = ClickHouseCluster(__file__)
@ -24,39 +24,28 @@ def started_cluster():
cluster.shutdown()
def keeper_query(query: str):
return CommandRequest(
[
cluster.server_bin_path,
"keeper-client",
"--host",
str(cluster.get_instance_ip("zoo1")),
"--port",
str(cluster.zookeeper_port),
"-q",
query,
],
stdin="",
)
@pytest.fixture(scope="function")
def client(started_cluster):
with KeeperClient.from_cluster(cluster, "zoo1") as keeper_client:
yield keeper_client
def test_big_family():
command = keeper_query(
"touch test_big_family;"
"touch test_big_family/1;"
"touch test_big_family/1/1;"
"touch test_big_family/1/2;"
"touch test_big_family/1/3;"
"touch test_big_family/1/4;"
"touch test_big_family/1/5;"
"touch test_big_family/2;"
"touch test_big_family/2/1;"
"touch test_big_family/2/2;"
"touch test_big_family/2/3;"
"find_big_family test_big_family;"
)
def test_big_family(client: KeeperClient):
client.touch("/test_big_family")
client.touch("/test_big_family/1")
client.touch("/test_big_family/1/1")
client.touch("/test_big_family/1/2")
client.touch("/test_big_family/1/3")
client.touch("/test_big_family/1/4")
client.touch("/test_big_family/1/5")
client.touch("/test_big_family/2")
client.touch("/test_big_family/2/1")
client.touch("/test_big_family/2/2")
client.touch("/test_big_family/2/3")
assert command.get_answer() == TSV(
response = client.find_big_family("/test_big_family")
assert response == TSV(
[
["/test_big_family/1", "5"],
["/test_big_family/2", "3"],
@ -71,34 +60,33 @@ def test_big_family():
]
)
command = keeper_query("find_big_family test_big_family 1;")
response = client.find_big_family("/test_big_family", 1)
assert command.get_answer() == TSV(
assert response == TSV(
[
["/test_big_family/1", "5"],
]
)
def test_find_super_nodes():
command = keeper_query(
"touch test_find_super_nodes;"
"touch test_find_super_nodes/1;"
"touch test_find_super_nodes/1/1;"
"touch test_find_super_nodes/1/2;"
"touch test_find_super_nodes/1/3;"
"touch test_find_super_nodes/1/4;"
"touch test_find_super_nodes/1/5;"
"touch test_find_super_nodes/2;"
"touch test_find_super_nodes/2/1;"
"touch test_find_super_nodes/2/2;"
"touch test_find_super_nodes/2/3;"
"touch test_find_super_nodes/2/4;"
"cd test_find_super_nodes;"
"find_super_nodes 4;"
)
def test_find_super_nodes(client: KeeperClient):
client.touch("/test_find_super_nodes")
client.touch("/test_find_super_nodes/1")
client.touch("/test_find_super_nodes/1/1")
client.touch("/test_find_super_nodes/1/2")
client.touch("/test_find_super_nodes/1/3")
client.touch("/test_find_super_nodes/1/4")
client.touch("/test_find_super_nodes/1/5")
client.touch("/test_find_super_nodes/2")
client.touch("/test_find_super_nodes/2/1")
client.touch("/test_find_super_nodes/2/2")
client.touch("/test_find_super_nodes/2/3")
client.touch("/test_find_super_nodes/2/4")
assert command.get_answer() == TSV(
client.cd("/test_find_super_nodes")
response = client.find_super_nodes(4)
assert response == TSV(
[
["/test_find_super_nodes/1", "5"],
["/test_find_super_nodes/2", "4"],
@ -106,41 +94,38 @@ def test_find_super_nodes():
)
def test_delete_stale_backups():
command = keeper_query(
"touch /clickhouse;"
"touch /clickhouse/backups;"
"touch /clickhouse/backups/1;"
"touch /clickhouse/backups/1/stage;"
"touch /clickhouse/backups/1/stage/alive123;"
"touch /clickhouse/backups/2;"
"touch /clickhouse/backups/2/stage;"
"touch /clickhouse/backups/2/stage/dead123;"
"delete_stale_backups;"
"y;"
"ls clickhouse/backups;"
)
def test_delete_stale_backups(client: KeeperClient):
client.touch("/clickhouse")
client.touch("/clickhouse/backups")
client.touch("/clickhouse/backups/1")
client.touch("/clickhouse/backups/1/stage")
client.touch("/clickhouse/backups/1/stage/alive123")
client.touch("/clickhouse/backups/2")
client.touch("/clickhouse/backups/2/stage")
client.touch("/clickhouse/backups/2/stage/dead123")
assert command.get_answer() == (
"You are going to delete all inactive backups in /clickhouse/backups. Continue?\n"
response = client.delete_stale_backups()
assert response == (
'Found backup "/clickhouse/backups/1", checking if it\'s active\n'
'Backup "/clickhouse/backups/1" is active, not going to delete\n'
'Found backup "/clickhouse/backups/2", checking if it\'s active\n'
'Backup "/clickhouse/backups/2" is not active, deleting it\n'
"1\n"
'Backup "/clickhouse/backups/2" is not active, deleting it'
)
def test_base_commands():
command = keeper_query(
"create test_create_zk_node1 testvalue1;"
"create test_create_zk_node_2 testvalue2;"
"get test_create_zk_node1;"
)
assert command.get_answer() == "testvalue1\n"
assert client.ls("/clickhouse/backups") == ["1"]
def test_four_letter_word_commands():
command = keeper_query("ruok")
assert command.get_answer() == "imok\n"
def test_base_commands(client: KeeperClient):
client.create("/test_create_zk_node1", "testvalue1")
client.create("/test_create_zk_node_2", "testvalue2")
assert client.get("/test_create_zk_node1") == "testvalue1"
client.create("/123", "1=2")
client.create("/123/321", "'foo;bar'")
assert client.get("/123") == "1=2"
assert client.get("/123/321") == "foo;bar"
def test_four_letter_word_commands(client: KeeperClient):
assert client.execute_query("ruok") == "imok"

View File

@ -1,11 +1,10 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import helpers.keeper_utils as ku
import os
from kazoo.client import KazooClient
from kazoo.exceptions import BadArgumentsException
import typing as tp
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
@ -19,11 +18,7 @@ part_of_cluster = "now this node is the part of cluster"
zk1, zk2, zk3 = None, None, None
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)
@pytest.fixture(scope="module")
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
@ -43,21 +38,26 @@ def started_cluster():
yield cluster
finally:
conn: tp.Optional[ku.KeeperClient]
for conn in [zk1, zk2, zk3]:
if conn:
if conn is not None:
conn.stop()
conn.close()
cluster.shutdown()
def test_reconfig_add(started_cluster):
def create_client(node: ClickHouseInstance):
return ku.KeeperClient(cluster.server_bin_path, cluster.get_instance_ip(node.name), 9181)
def test_reconfig_add():
"""
Add a node to another node. Then add another node to two.
"""
global zk1, zk2, zk3
zk1 = create_client(node1)
zk1 = get_fake_zk(node1)
config = ku.get_config_str(zk1)
config = zk1.get("/keeper/config")
print("Initial config", config)
assert len(config.split("\n")) == 1
@ -65,24 +65,22 @@ def test_reconfig_add(started_cluster):
assert "node2" not in config
assert "node3" not in config
with pytest.raises(BadArgumentsException):
with pytest.raises(ku.KeeperException):
# duplicate id with different endpoint
zk1.reconfig(joining="server.1=localhost:1337", leaving=None, new_members=None)
with pytest.raises(BadArgumentsException):
with pytest.raises(ku.KeeperException):
# duplicate endpoint
zk1.reconfig(joining="server.8=node1:9234", leaving=None, new_members=None)
for i in range(100):
zk1.create(f"/test_three_{i}", b"somedata")
zk1.create(f"/test_three_{i}", "somedata")
node2.start_clickhouse()
config, _ = zk1.reconfig(
config = zk1.reconfig(
joining="server.2=node2:9234", leaving=None, new_members=None
)
ku.wait_until_connected(cluster, node2)
config = config.decode("utf-8")
print("After adding 2", config)
assert len(config.split("\n")) == 2
@ -90,12 +88,12 @@ def test_reconfig_add(started_cluster):
assert "node2" in config
assert "node3" not in config
zk2 = get_fake_zk(node2)
zk2 = create_client(node2)
ku.wait_configs_equal(config, zk2)
for i in range(100):
assert zk2.exists(f"/test_three_{i}") is not None
zk2.create(f"/test_three_{100 + i}", b"somedata")
assert zk2.exists(f"/test_three_{i}")
zk2.create(f"/test_three_{100 + i}", "somedata")
# Why not both?
# One node will process add_srv request, other will pull out updated config, apply
@ -107,23 +105,21 @@ def test_reconfig_add(started_cluster):
assert node2.contains_in_log(part_of_cluster)
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1 = create_client(node1)
zk1.sync("/test_three_0")
for i in range(200):
assert zk1.exists(f"/test_three_{i}") is not None
assert zk1.exists(f"/test_three_{i}")
for i in range(100):
zk2.create(f"/test_four_{i}", b"somedata")
zk2.create(f"/test_four_{i}", "somedata")
node3.start_clickhouse()
config, _ = zk2.reconfig(
config = zk2.reconfig(
joining="server.3=node3:9234", leaving=None, new_members=None
)
ku.wait_until_connected(cluster, node3)
config = config.decode("utf-8")
print("After adding 3", config)
assert len(config.split("\n")) == 3
@ -131,25 +127,23 @@ def test_reconfig_add(started_cluster):
assert "node2" in config
assert "node3" in config
zk3 = get_fake_zk(node3)
zk3 = create_client(node3)
ku.wait_configs_equal(config, zk3)
for i in range(100):
assert zk3.exists(f"/test_four_{i}") is not None
zk3.create(f"/test_four_{100 + i}", b"somedata")
assert zk3.exists(f"/test_four_{i}")
zk3.create(f"/test_four_{100 + i}", "somedata")
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1 = create_client(node1)
zk1.sync("/test_four_0")
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2 = create_client(node2)
zk2.sync("/test_four_0")
for i in range(200):
assert zk1.exists(f"/test_four_{i}") is not None
assert zk2.exists(f"/test_four_{i}") is not None
assert zk1.exists(f"/test_four_{i}")
assert zk2.exists(f"/test_four_{i}")
assert node3.contains_in_log(part_of_cluster)

View File

@ -1,11 +1,11 @@
#!/usr/bin/env python3
import subprocess
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import helpers.keeper_utils as ku
import os
from kazoo.client import KazooClient
from kazoo.exceptions import BadVersionException, BadArgumentsException
import typing as tp
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
@ -23,16 +23,16 @@ def started_cluster():
cluster.start()
yield cluster
finally:
conn: tp.Optional[ku.KeeperClient]
for conn in [zk1, zk2, zk3]:
if conn:
conn.stop()
conn.close()
cluster.shutdown()
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)
def create_client(node: ClickHouseInstance):
return ku.KeeperClient(cluster.server_bin_path, cluster.get_instance_ip(node.name), 9181)
def test_reconfig_remove_followers_from_3(started_cluster):
@ -42,9 +42,9 @@ def test_reconfig_remove_followers_from_3(started_cluster):
Check that remaining node is in standalone mode.
"""
zk1 = get_fake_zk(node1)
config, _ = zk1.get("/keeper/config")
config = config.decode("utf-8")
global zk1, zk2, zk3
zk1 = create_client(node1)
config = zk1.get("/keeper/config")
print("Initial config", config)
assert len(config.split("\n")) == 3
@ -52,36 +52,33 @@ def test_reconfig_remove_followers_from_3(started_cluster):
assert "node2" in config
assert "node3" in config
with pytest.raises(BadVersionException):
zk1.reconfig(joining=None, leaving="1", new_members=None, from_config=20)
with pytest.raises(BadArgumentsException):
with pytest.raises(ValueError):
zk1.reconfig(joining=None, leaving=None, new_members=None)
with pytest.raises(BadArgumentsException):
with pytest.raises(ku.KeeperException):
# bulk reconfiguration is not supported
zk1.reconfig(joining=None, leaving=None, new_members="3")
with pytest.raises(BadArgumentsException):
with pytest.raises(ValueError):
zk1.reconfig(joining="1", leaving="1", new_members="3")
with pytest.raises(BadArgumentsException):
with pytest.raises(ku.KeeperException):
# at least one node must be left
zk1.reconfig(joining=None, leaving="1,2,3", new_members=None)
for i in range(100):
zk1.create(f"/test_two_{i}", b"somedata")
zk1.create(f"/test_two_{i}", "somedata")
zk2 = get_fake_zk(node2)
zk2 = create_client(node2)
zk2.sync("/test_two_0")
ku.wait_configs_equal(config, zk2)
zk3 = get_fake_zk(node3)
zk3 = create_client(node3)
zk3.sync("/test_two_0")
ku.wait_configs_equal(config, zk3)
for i in range(100):
assert zk2.exists(f"test_two_{i}") is not None
assert zk3.exists(f"test_two_{i}") is not None
assert zk2.exists(f"test_two_{i}")
assert zk3.exists(f"test_two_{i}")
config, _ = zk1.reconfig(joining=None, leaving="3", new_members=None)
config = config.decode("utf-8")
config = zk1.reconfig(joining=None, leaving="3", new_members=None)
print("After removing 3", config)
assert len(config.split("\n")) == 2
@ -90,35 +87,26 @@ def test_reconfig_remove_followers_from_3(started_cluster):
assert "node3" not in config
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2 = create_client(node2)
ku.wait_configs_equal(config, zk2)
for i in range(100):
assert zk2.exists(f"test_two_{i}") is not None
zk2.create(f"/test_two_{100 + i}", b"otherdata")
assert zk2.exists(f"test_two_{i}")
zk2.create(f"/test_two_{100 + i}", "otherdata")
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1 = create_client(node1)
zk1.sync("/test_two_0")
for i in range(200):
assert zk1.exists(f"test_two_{i}") is not None
with pytest.raises(Exception):
zk3.stop()
zk3.close()
zk3 = get_fake_zk(node3)
zk3.sync("/test_two_0")
assert zk1.exists(f"test_two_{i}")
assert node3.contains_in_log(log_msg_removed)
for i in range(100):
zk2.create(f"/test_two_{200 + i}", b"otherdata")
zk2.create(f"/test_two_{200 + i}", "otherdata")
config, _ = zk1.reconfig(joining=None, leaving="2", new_members=None)
config = config.decode("utf-8")
config = zk1.reconfig(joining=None, leaving="2", new_members=None)
print("After removing 2", config)
assert len(config.split("\n")) == 1
@ -127,19 +115,12 @@ def test_reconfig_remove_followers_from_3(started_cluster):
assert "node3" not in config
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1 = create_client(node1)
zk1.sync("/test_two_0")
for i in range(300):
assert zk1.exists(f"test_two_{i}") is not None
with pytest.raises(Exception):
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2.sync("/test_two_0")
assert zk1.exists(f"test_two_{i}")
assert not node1.contains_in_log(log_msg_removed)
assert node2.contains_in_log(log_msg_removed)
assert "Mode: standalone" in zk1.command(b"stat")
assert "Mode: standalone" in zk1.execute_query('stat')

View File

@ -1,11 +1,11 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import helpers.keeper_utils as ku
import os
from kazoo.client import KazooClient, KazooState
from kazoo.exceptions import BadVersionException, BadArgumentsException
import typing as tp
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs")
@ -26,49 +26,49 @@ def started_cluster():
cluster.start()
yield cluster
finally:
conn: tp.Optional[ku.KeeperClient]
for conn in [zk1, zk2, zk3, zk4, zk5]:
if conn:
conn.stop()
conn.close()
cluster.shutdown()
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)
def create_client(node: ClickHouseInstance):
return ku.KeeperClient(cluster.server_bin_path, cluster.get_instance_ip(node.name), 9181)
def test_reconfig_remove_2_and_leader(started_cluster):
"""
Remove 2 followers from a cluster of 5. Remove leader from 3 nodes.
"""
global zk1, zk2, zk3, zk4, zk5
zk1 = get_fake_zk(node1)
zk1 = create_client(node1)
config = ku.get_config_str(zk1)
print("Initial config", config)
assert len(config.split("\n")) == 5
for i in range(100):
zk1.create(f"/test_two_{i}", b"somedata")
zk1.create(f"/test_two_{i}", "somedata")
zk4 = get_fake_zk(node4)
zk4 = create_client(node4)
zk4.sync("/test_two_0")
ku.wait_configs_equal(config, zk4)
zk5 = get_fake_zk(node5)
zk5 = create_client(node5)
zk5.sync("/test_two_0")
ku.wait_configs_equal(config, zk5)
for i in range(100):
assert zk4.exists(f"test_two_{i}") is not None
assert zk5.exists(f"test_two_{i}") is not None
assert zk4.exists(f"test_two_{i}")
assert zk5.exists(f"test_two_{i}")
zk4.create(f"/test_two_{100 + i}", b"otherdata")
zk4.create(f"/test_two_{100 + i}", "otherdata")
zk2 = get_fake_zk(node2)
config, _ = zk2.reconfig(joining=None, leaving="4,5", new_members=None)
config = config.decode("utf-8")
zk2 = create_client(node2)
config = zk2.reconfig(joining=None, leaving="4,5", new_members=None)
print("After removing 4,5", config)
assert len(config.split("\n")) == 3
@ -79,27 +79,14 @@ def test_reconfig_remove_2_and_leader(started_cluster):
assert "node5" not in config
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1 = create_client(node1)
zk1.sync("/test_two_0")
ku.wait_configs_equal(config, zk1)
for i in range(200):
assert zk1.exists(f"test_two_{i}") is not None
assert zk2.exists(f"test_two_{i}") is not None
with pytest.raises(Exception):
zk4.stop()
zk4.close()
zk4 = get_fake_zk(node4)
zk4.sync("/test_two_0")
with pytest.raises(Exception):
zk5.stop()
zk5.close()
zk5 = get_fake_zk(node5)
zk5.sync("/test_two_0")
assert zk1.exists(f"test_two_{i}")
assert zk2.exists(f"test_two_{i}")
assert not node1.contains_in_log(log_msg_removed)
assert not node2.contains_in_log(log_msg_removed)
@ -110,11 +97,10 @@ def test_reconfig_remove_2_and_leader(started_cluster):
assert ku.is_leader(cluster, node1)
for i in range(100):
zk1.create(f"/test_leader_{i}", b"somedata")
zk1.create(f"/test_leader_{i}", "somedata")
# when a leader gets a remove request, it must yield leadership
config, _ = zk1.reconfig(joining=None, leaving="1", new_members=None)
config = config.decode("utf-8")
config = zk1.reconfig(joining=None, leaving="1", new_members=None)
print("After removing 1 (leader)", config)
assert len(config.split("\n")) == 2
@ -125,24 +111,17 @@ def test_reconfig_remove_2_and_leader(started_cluster):
assert "node5" not in config
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2 = create_client(node2)
zk2.sync("/test_leader_0")
ku.wait_configs_equal(config, zk2)
zk3 = get_fake_zk(node3)
zk3 = create_client(node3)
zk3.sync("/test_leader_0")
ku.wait_configs_equal(config, zk3)
for i in range(100):
assert zk2.exists(f"test_leader_{i}") is not None
assert zk3.exists(f"test_leader_{i}") is not None
with pytest.raises(Exception):
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1.sync("/test_leader_0")
assert zk2.exists(f"test_leader_{i}")
assert zk3.exists(f"test_leader_{i}")
assert node1.contains_in_log(log_msg_removed)
assert not node2.contains_in_log(log_msg_removed)

View File

@ -1,11 +1,10 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from os.path import join, dirname, realpath
import time
import helpers.keeper_utils as ku
from kazoo.client import KazooClient, KazooState
import typing as tp
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = join(dirname(realpath(__file__)), "configs")
@ -31,24 +30,26 @@ def started_cluster():
yield cluster
finally:
conn: tp.Optional[ku.KeeperClient]
for conn in [zk1, zk2, zk3, zk4]:
if conn:
conn.stop()
conn.close()
cluster.shutdown()
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)
def create_client(node: ClickHouseInstance):
return ku.KeeperClient(
cluster.server_bin_path, cluster.get_instance_ip(node.name), 9181
)
def test_reconfig_replace_leader(started_cluster):
"""
Remove leader from a cluster of 3 and add a new node via two commands.
"""
zk1 = get_fake_zk(node1)
global zk1, zk2, zk3, zk4
zk1 = create_client(node1)
config = ku.get_config_str(zk1)
assert len(config.split("\n")) == 3
@ -58,23 +59,22 @@ def test_reconfig_replace_leader(started_cluster):
assert "node4" not in config
for i in range(100):
zk1.create(f"/test_four_{i}", b"somedata")
zk1.create(f"/test_four_{i}", "somedata")
zk2 = get_fake_zk(node2)
zk2 = create_client(node2)
zk2.sync("/test_four_0")
ku.wait_configs_equal(config, zk2)
zk3 = get_fake_zk(node3)
zk3 = create_client(node3)
zk3.sync("/test_four_0")
ku.wait_configs_equal(config, zk3)
for i in range(100):
assert zk2.exists(f"/test_four_{i}") is not None
assert zk3.exists(f"/test_four_{i}") is not None
assert zk2.exists(f"/test_four_{i}")
assert zk3.exists(f"/test_four_{i}")
assert ku.is_leader(cluster, node1)
config, _ = zk2.reconfig(joining=None, leaving="1", new_members=None)
config = config.decode("utf-8")
config = zk2.reconfig(joining=None, leaving="1", new_members=None)
print("After removing 1 (leader)", config)
assert len(config.split("\n")) == 2
@ -85,17 +85,10 @@ def test_reconfig_replace_leader(started_cluster):
ku.wait_configs_equal(config, zk2)
with pytest.raises(Exception):
zk1.stop()
zk1.close()
zk1 = get_fake_zk(node1)
zk1.sync("/test_four_0")
node4.start_clickhouse()
config, _ = zk2.reconfig(
config = zk2.reconfig(
joining="server.4=node4:9234", leaving=None, new_members=None
)
config = config.decode("utf-8")
ku.wait_until_connected(cluster, node4)
print("After adding 4", config)
@ -105,22 +98,20 @@ def test_reconfig_replace_leader(started_cluster):
assert "node3" in config
assert "node4" in config
zk4 = get_fake_zk(node4)
zk4 = create_client(node4)
ku.wait_configs_equal(config, zk4)
for i in range(100):
assert zk4.exists(f"test_four_{i}") is not None
zk4.create(f"/test_four_{100 + i}", b"somedata")
assert zk4.exists(f"test_four_{i}")
zk4.create(f"/test_four_{100 + i}", "somedata")
zk2.stop()
zk2.close()
zk2 = get_fake_zk(node2)
zk2 = create_client(node2)
zk2.sync("/test_four_0")
ku.wait_configs_equal(config, zk2)
zk3.stop()
zk3.close()
zk3 = get_fake_zk(node3)
zk3 = create_client(node3)
zk3.sync("/test_four_0")
ku.wait_configs_equal(config, zk3)

View File

@ -1,35 +0,0 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -1,35 +0,0 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -1,35 +0,0 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<start_as_follower>true</start_as_follower>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -1,21 +0,0 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>4</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<enable_reconfiguration>true</enable_reconfiguration>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server> <id>2</id> <hostname>node2</hostname> <port>9234</port> </server>
<server> <id>3</id> <hostname>node3</hostname> <port>9234</port> </server>
<server> <id>4</id> <hostname>node4</hostname> <port>9234</port> </server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -1,43 +0,0 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
from os.path import join, dirname, realpath
import time
import helpers.keeper_utils as ku
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
CONFIG_DIR = join(dirname(realpath(__file__)), "configs")
node1 = cluster.add_instance("node1", main_configs=["configs/keeper1.xml"])
node2 = cluster.add_instance("node2", main_configs=["configs/keeper2.xml"])
node3 = cluster.add_instance("node3", main_configs=["configs/keeper3.xml"])
node4 = cluster.add_instance("node4", stay_alive=True)
zk1, zk2, zk3, zk4 = None, None, None, None
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node4.stop_clickhouse()
node4.copy_file_to_container(
join(CONFIG_DIR, "keeper4.xml"),
"/etc/clickhouse-server/config.d/keeper.xml",
)
yield cluster
finally:
for conn in [zk1, zk2, zk3, zk4]:
if conn:
conn.stop()
conn.close()
cluster.shutdown()
def get_fake_zk(node):
return ku.get_fake_zk(cluster, node)