diff --git a/docs/en/operations/utilities/clickhouse-keeper-client.md b/docs/en/operations/utilities/clickhouse-keeper-client.md index 2140d22b620..9f24ba9587b 100644 --- a/docs/en/operations/utilities/clickhouse-keeper-client.md +++ b/docs/en/operations/utilities/clickhouse-keeper-client.md @@ -16,6 +16,8 @@ A client application to interact with clickhouse-keeper by its native protocol. - `--session-timeout=TIMEOUT` — Set session timeout in seconds. Default value: 10s. - `--operation-timeout=TIMEOUT` — Set operation timeout in seconds. Default value: 10s. - `--history-file=FILE_PATH` — Set path of history file. Default value: `~/.keeper-client-history`. +- `--log-level=LEVEL` — Set log level. Default value: `information`. +- `--no-confirmation` — If set, will not require a confirmation on several commands. Default value `false` for interactive and `true` for query - `--help` — Shows the help message. ## Example {#clickhouse-keeper-client-example} @@ -44,6 +46,7 @@ keeper foo bar - `ls [path]` -- Lists the nodes for the given path (default: cwd) - `cd [path]` -- Change the working path (default `.`) +- `exists ` -- Returns `1` if node exists, `0` otherwise - `set [version]` -- Updates the node's value. Only update if version matches (default: -1) - `create [mode]` -- Creates new node with the set value - `touch ` -- Creates new node with an empty string as value. Doesn't throw an exception if the node already exists @@ -56,3 +59,5 @@ keeper foo bar - `find_super_nodes [path]` -- Finds nodes with number of children larger than some threshold for the given path (default `.`) - `delete_stale_backups` -- Deletes ClickHouse nodes used for backups that are now inactive - `find_big_family [path] [n]` -- Returns the top n nodes with the biggest family in the subtree (default path = `.` and n = 10) +- `sync ` -- Synchronizes node between processes and leader +- `reconfig "" [version]` -- Reconfigures Keeper cluster. See https://clickhouse.com/docs/en/guides/sre/keeper/clickhouse-keeper#reconfiguration diff --git a/programs/keeper-client/Commands.cpp b/programs/keeper-client/Commands.cpp index e265d7f4858..65bc719f1be 100644 --- a/programs/keeper-client/Commands.cpp +++ b/programs/keeper-client/Commands.cpp @@ -158,6 +158,21 @@ void GetCommand::execute(const ASTKeeperQuery * query, KeeperClient * client) co std::cout << client->zookeeper->get(client->getAbsolutePath(query->args[0].safeGet())) << "\n"; } +bool ExistsCommand::parse(IParser::Pos & pos, std::shared_ptr & node, DB::Expected & expected) const +{ + String path; + if (!parseKeeperPath(pos, expected, path)) + return false; + node->args.push_back(std::move(path)); + + return true; +} + +void ExistsCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient * client) const +{ + std::cout << client->zookeeper->exists(client->getAbsolutePath(query->args[0].safeGet())) << "\n"; +} + bool GetStatCommand::parse(IParser::Pos & pos, std::shared_ptr & node, Expected & expected) const { String path; @@ -400,6 +415,21 @@ void ReconfigCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient std::cout << response.value << '\n'; } +bool SyncCommand::parse(IParser::Pos & pos, std::shared_ptr & node, DB::Expected & expected) const +{ + String path; + if (!parseKeeperPath(pos, expected, path)) + return false; + node->args.push_back(std::move(path)); + + return true; +} + +void SyncCommand::execute(const DB::ASTKeeperQuery * query, DB::KeeperClient * client) const +{ + std::cout << client->zookeeper->sync(client->getAbsolutePath(query->args[0].safeGet())) << "\n"; +} + bool HelpCommand::parse(IParser::Pos & /* pos */, std::shared_ptr & /* node */, Expected & /* expected */) const { return true; diff --git a/programs/keeper-client/Commands.h b/programs/keeper-client/Commands.h index c2f7bd9b896..19d5ee2a516 100644 --- a/programs/keeper-client/Commands.h +++ b/programs/keeper-client/Commands.h @@ -101,6 +101,17 @@ class GetCommand : public IKeeperClientCommand String getHelpMessage() const override { return "{} -- Returns the node's value"; } }; +class ExistsCommand : public IKeeperClientCommand +{ + String getName() const override { return "exists"; } + + bool parse(IParser::Pos & pos, std::shared_ptr & node, Expected & expected) const override; + + void execute(const ASTKeeperQuery * query, KeeperClient * client) const override; + + String getHelpMessage() const override { return "{} -- Returns `1` if node exists, `0` otherwise"; } +}; + class GetStatCommand : public IKeeperClientCommand { String getName() const override { return "get_stat"; } @@ -179,7 +190,8 @@ class RMRCommand : public IKeeperClientCommand class ReconfigCommand : public IKeeperClientCommand { - enum class Operation : Int64 { + enum class Operation : Int64 + { ADD = 0, REMOVE = 1, SET = 2, @@ -191,7 +203,18 @@ class ReconfigCommand : public IKeeperClientCommand void execute(const ASTKeeperQuery * query, KeeperClient * client) const override; - String getHelpMessage() const override { return "{} \"\" [version] -- Reconfigures a ZooKeeper cluster. See https://clickhouse.com/docs/en/guides/sre/keeper/clickhouse-keeper#reconfiguration"; } + String getHelpMessage() const override { return "{} \"\" [version] -- Reconfigures Keeper cluster. See https://clickhouse.com/docs/en/guides/sre/keeper/clickhouse-keeper#reconfiguration"; } +}; + +class SyncCommand: public IKeeperClientCommand +{ + String getName() const override { return "sync"; } + + bool parse(IParser::Pos & pos, std::shared_ptr & node, Expected & expected) const override; + + void execute(const ASTKeeperQuery * query, KeeperClient * client) const override; + + String getHelpMessage() const override { return "{} -- Synchronizes node between processes and leader"; } }; class HelpCommand : public IKeeperClientCommand diff --git a/programs/keeper-client/KeeperClient.cpp b/programs/keeper-client/KeeperClient.cpp index afaf94448d5..890b937e384 100644 --- a/programs/keeper-client/KeeperClient.cpp +++ b/programs/keeper-client/KeeperClient.cpp @@ -173,6 +173,14 @@ void KeeperClient::defineOptions(Poco::Util::OptionSet & options) Poco::Util::Option("log-level", "", "set log level") .argument("") .binding("log-level")); + + options.addOption( + Poco::Util::Option("no-confirmation", "", "if set, will not require a confirmation on several commands. default false for interactive and true for query") + .binding("no-confirmation")); + + options.addOption( + Poco::Util::Option("tests-mode", "", "run keeper-client in a special mode for tests. all commands output are separated by special symbols. default false") + .binding("tests-mode")); } void KeeperClient::initialize(Poco::Util::Application & /* self */) @@ -187,6 +195,7 @@ void KeeperClient::initialize(Poco::Util::Application & /* self */) std::make_shared(), std::make_shared(), std::make_shared(), + std::make_shared(), std::make_shared(), std::make_shared(), std::make_shared(), @@ -194,6 +203,7 @@ void KeeperClient::initialize(Poco::Util::Application & /* self */) std::make_shared(), std::make_shared(), std::make_shared(), + std::make_shared(), std::make_shared(), std::make_shared(), }); @@ -233,7 +243,6 @@ void KeeperClient::initialize(Poco::Util::Application & /* self */) EventNotifier::init(); } - bool KeeperClient::processQueryText(const String & text) { if (exit_strings.find(text) != exit_strings.end()) @@ -287,7 +296,7 @@ bool KeeperClient::processQueryText(const String & text) return true; } -void KeeperClient::runInteractive() +void KeeperClient::runInteractiveReplxx() { LineReader::Patterns query_extenders = {"\\"}; @@ -321,6 +330,26 @@ void KeeperClient::runInteractive() } } +void KeeperClient::runInteractiveInputStream() +{ + for (String input; std::getline(std::cin, input);) + { + if (!processQueryText(input)) + break; + + std::cout << "\a\a\a\a" << std::endl; + std::cerr << std::flush; + } +} + +void KeeperClient::runInteractive() +{ + if (config().hasOption("tests-mode")) + runInteractiveInputStream(); + else + runInteractiveReplxx(); +} + int KeeperClient::main(const std::vector & /* args */) { if (config().hasOption("help")) @@ -370,9 +399,11 @@ int KeeperClient::main(const std::vector & /* args */) zk_args.operation_timeout_ms = config().getInt("operation-timeout", 10) * 1000; zookeeper = std::make_unique(zk_args); + if (config().has("no-confirmation") || config().has("query")) + ask_confirmation = false; + if (config().has("query")) { - ask_confirmation = false; processQueryText(config().getString("query")); } else diff --git a/programs/keeper-client/KeeperClient.h b/programs/keeper-client/KeeperClient.h index 4a20a45de6e..0d3db3c2f02 100644 --- a/programs/keeper-client/KeeperClient.h +++ b/programs/keeper-client/KeeperClient.h @@ -49,6 +49,9 @@ public: protected: void runInteractive(); + void runInteractiveReplxx(); + void runInteractiveInputStream(); + bool processQueryText(const String & text); void loadCommands(std::vector && new_commands); diff --git a/tests/integration/helpers/keeper_utils.py b/tests/integration/helpers/keeper_utils.py index 93ea3fa74b7..a9ee3075750 100644 --- a/tests/integration/helpers/keeper_utils.py +++ b/tests/integration/helpers/keeper_utils.py @@ -1,6 +1,164 @@ +import io +import subprocess import socket import time +import typing as tp +import contextlib +import select from kazoo.client import KazooClient +from helpers.cluster import ClickHouseCluster, ClickHouseInstance +from helpers.client import CommandRequest + + +def execute_keeper_client_query(cluster: ClickHouseCluster, node: ClickHouseInstance, query: str) -> str: + request = CommandRequest( + [ + cluster.server_bin_path, + "keeper-client", + "--host", + str(cluster.get_instance_ip(node.name)), + "--port", + str(cluster.zookeeper_port), + "-q", + query, + ], + stdin="", + ) + + return request.get_answer() + + +class KeeperException(Exception): + pass + + +class KeeperClient(object): + + SEPARATOR = b'\a\a\a\a\n' + + def __init__(self, bin_path: str, host: str, port: int): + self.bin_path = bin_path + self.host = host + self.port = port + + self.proc = subprocess.Popen( + [ + bin_path, + 'keeper-client', + '--host', + host, + '--port', + str(port), + '--log-level', + 'error', + '--tests-mode', + '--no-confirmation', + ], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + self.poller = select.epoll() + self.poller.register(self.proc.stdout) + self.poller.register(self.proc.stderr) + + self._fd_nums = { + self.proc.stdout.fileno(): self.proc.stdout, + self.proc.stderr.fileno(): self.proc.stderr, + } + + self.stopped = False + + def execute_query(self, query: str, timeout: float = 10.) -> str: + output = io.BytesIO() + + self.proc.stdin.write(query.encode() + b'\n') + self.proc.stdin.flush() + + events = self.poller.poll(timeout) + + for fd_num, event in events: + if event & (select.EPOLLIN | select.EPOLLPRI): + file = self._fd_nums[fd_num] + + if file == self.proc.stdout: + while True: + chunk = file.readline() + if chunk.endswith(self.SEPARATOR): + break + + output.write(chunk) + + elif file == self.proc.stderr: + assert self.proc.stdout.readline() == self.SEPARATOR + raise KeeperException(self.proc.stderr.readline().strip().decode()) + + else: + raise ValueError(f'Failed to read from pipe. Flag {event}') + + data = output.getvalue().strip().decode() + return data + + def cd(self, path: str, timeout: float = 10.): + self.execute_query(f'cd {path}', timeout) + + def ls(self, path: str, timeout: float = 10.) -> list[str]: + return self.execute_query(f'ls {path}', timeout).split(' ') + + def create(self, path: str, value: str, timeout: float = 10.): + self.execute_query(f'create {path} {value}', timeout) + + def get(self, path: str, timeout: float = 10.) -> str: + return self.execute_query(f'get {path}', timeout) + + def exists(self, path: str, timeout: float = 10.) -> bool: + return bool(int(self.execute_query(f'exists {path}', timeout))) + + def stop(self): + if not self.stopped: + self.stopped = True + self.proc.communicate(b'exit\n', timeout=10.) + + def sync(self, path: str, timeout: float = 10.): + self.execute_query(f'sync {path}', timeout) + + def touch(self, path: str, timeout: float = 10.): + self.execute_query(f'touch {path}', timeout) + + def find_big_family(self, path: str, n: int = 10, timeout: float = 10.) -> str: + return self.execute_query(f'find_big_family {path} {n}', timeout) + + def find_super_nodes(self, threshold: int, timeout: float = 10.) -> str: + return self.execute_query(f'find_super_nodes {threshold}', timeout) + + def delete_stale_backups(self, timeout: float = 10.) -> str: + return self.execute_query('delete_stale_backups', timeout) + + def reconfig(self, joining: tp.Optional[str], leaving: tp.Optional[str], new_members: tp.Optional[str], timeout: float = 10.) -> str: + if bool(joining) + bool(leaving) + bool(new_members) != 1: + raise ValueError('Exactly one of joining, leaving or new_members must be specified') + + if joining is not None: + operation = 'add' + elif leaving is not None: + operation = 'remove' + elif new_members is not None: + operation = 'set' + else: + raise ValueError('At least one of joining, leaving or new_members must be specified') + + return self.execute_query(f'reconfig {operation} {joining or leaving or new_members}', timeout) + + @classmethod + @contextlib.contextmanager + def from_cluster(cls, cluster: ClickHouseCluster, keeper_node: str, port: tp.Optional[int] = None) -> 'KeeperClient': + client = cls(cluster.server_bin_path, cluster.get_instance_ip(keeper_node), port or cluster.zookeeper_port) + + try: + yield client + finally: + client.stop() def get_keeper_socket(cluster, node, port=9181): @@ -70,14 +228,14 @@ def get_fake_zk(cluster, node, timeout: float = 30.0) -> KazooClient: return _fake -def get_config_str(zk: KazooClient) -> str: +def get_config_str(zk: KeeperClient) -> str: """ Return decoded contents of /keeper/config node """ - return zk.get("/keeper/config")[0].decode("utf-8") + return zk.get("/keeper/config") -def wait_configs_equal(left_config: str, right_zk: KazooClient, timeout: float = 30.0): +def wait_configs_equal(left_config: str, right_zk: KeeperClient, timeout: float = 30.0): """ Check whether get /keeper/config result in left_config is equal to get /keeper/config on right_zk ZK connection. diff --git a/tests/integration/test_keeper_client/test.py b/tests/integration/test_keeper_client/test.py index 8f7056a5afd..c82917372b8 100644 --- a/tests/integration/test_keeper_client/test.py +++ b/tests/integration/test_keeper_client/test.py @@ -1,7 +1,7 @@ import pytest -from helpers.client import CommandRequest from helpers.cluster import ClickHouseCluster from helpers.test_tools import TSV +from helpers.keeper_utils import KeeperClient cluster = ClickHouseCluster(__file__) @@ -24,39 +24,28 @@ def started_cluster(): cluster.shutdown() -def keeper_query(query: str): - return CommandRequest( - [ - cluster.server_bin_path, - "keeper-client", - "--host", - str(cluster.get_instance_ip("zoo1")), - "--port", - str(cluster.zookeeper_port), - "-q", - query, - ], - stdin="", - ) +@pytest.fixture(scope="function") +def client(started_cluster): + with KeeperClient.from_cluster(cluster, "zoo1") as keeper_client: + yield keeper_client -def test_big_family(): - command = keeper_query( - "touch test_big_family;" - "touch test_big_family/1;" - "touch test_big_family/1/1;" - "touch test_big_family/1/2;" - "touch test_big_family/1/3;" - "touch test_big_family/1/4;" - "touch test_big_family/1/5;" - "touch test_big_family/2;" - "touch test_big_family/2/1;" - "touch test_big_family/2/2;" - "touch test_big_family/2/3;" - "find_big_family test_big_family;" - ) +def test_big_family(client: KeeperClient): + client.touch("/test_big_family") + client.touch("/test_big_family/1") + client.touch("/test_big_family/1/1") + client.touch("/test_big_family/1/2") + client.touch("/test_big_family/1/3") + client.touch("/test_big_family/1/4") + client.touch("/test_big_family/1/5") + client.touch("/test_big_family/2") + client.touch("/test_big_family/2/1") + client.touch("/test_big_family/2/2") + client.touch("/test_big_family/2/3") - assert command.get_answer() == TSV( + response = client.find_big_family("/test_big_family") + + assert response == TSV( [ ["/test_big_family/1", "5"], ["/test_big_family/2", "3"], @@ -71,34 +60,33 @@ def test_big_family(): ] ) - command = keeper_query("find_big_family test_big_family 1;") + response = client.find_big_family("/test_big_family", 1) - assert command.get_answer() == TSV( + assert response == TSV( [ ["/test_big_family/1", "5"], ] ) -def test_find_super_nodes(): - command = keeper_query( - "touch test_find_super_nodes;" - "touch test_find_super_nodes/1;" - "touch test_find_super_nodes/1/1;" - "touch test_find_super_nodes/1/2;" - "touch test_find_super_nodes/1/3;" - "touch test_find_super_nodes/1/4;" - "touch test_find_super_nodes/1/5;" - "touch test_find_super_nodes/2;" - "touch test_find_super_nodes/2/1;" - "touch test_find_super_nodes/2/2;" - "touch test_find_super_nodes/2/3;" - "touch test_find_super_nodes/2/4;" - "cd test_find_super_nodes;" - "find_super_nodes 4;" - ) +def test_find_super_nodes(client: KeeperClient): + client.touch("/test_find_super_nodes") + client.touch("/test_find_super_nodes/1") + client.touch("/test_find_super_nodes/1/1") + client.touch("/test_find_super_nodes/1/2") + client.touch("/test_find_super_nodes/1/3") + client.touch("/test_find_super_nodes/1/4") + client.touch("/test_find_super_nodes/1/5") + client.touch("/test_find_super_nodes/2") + client.touch("/test_find_super_nodes/2/1") + client.touch("/test_find_super_nodes/2/2") + client.touch("/test_find_super_nodes/2/3") + client.touch("/test_find_super_nodes/2/4") - assert command.get_answer() == TSV( + client.cd("/test_find_super_nodes") + + response = client.find_super_nodes(4) + assert response == TSV( [ ["/test_find_super_nodes/1", "5"], ["/test_find_super_nodes/2", "4"], @@ -106,41 +94,38 @@ def test_find_super_nodes(): ) -def test_delete_stale_backups(): - command = keeper_query( - "touch /clickhouse;" - "touch /clickhouse/backups;" - "touch /clickhouse/backups/1;" - "touch /clickhouse/backups/1/stage;" - "touch /clickhouse/backups/1/stage/alive123;" - "touch /clickhouse/backups/2;" - "touch /clickhouse/backups/2/stage;" - "touch /clickhouse/backups/2/stage/dead123;" - "delete_stale_backups;" - "y;" - "ls clickhouse/backups;" - ) +def test_delete_stale_backups(client: KeeperClient): + client.touch("/clickhouse") + client.touch("/clickhouse/backups") + client.touch("/clickhouse/backups/1") + client.touch("/clickhouse/backups/1/stage") + client.touch("/clickhouse/backups/1/stage/alive123") + client.touch("/clickhouse/backups/2") + client.touch("/clickhouse/backups/2/stage") + client.touch("/clickhouse/backups/2/stage/dead123") - assert command.get_answer() == ( - "You are going to delete all inactive backups in /clickhouse/backups. Continue?\n" + response = client.delete_stale_backups() + + assert response == ( 'Found backup "/clickhouse/backups/1", checking if it\'s active\n' 'Backup "/clickhouse/backups/1" is active, not going to delete\n' 'Found backup "/clickhouse/backups/2", checking if it\'s active\n' - 'Backup "/clickhouse/backups/2" is not active, deleting it\n' - "1\n" + 'Backup "/clickhouse/backups/2" is not active, deleting it' ) - -def test_base_commands(): - command = keeper_query( - "create test_create_zk_node1 testvalue1;" - "create test_create_zk_node_2 testvalue2;" - "get test_create_zk_node1;" - ) - - assert command.get_answer() == "testvalue1\n" + assert client.ls("/clickhouse/backups") == ["1"] -def test_four_letter_word_commands(): - command = keeper_query("ruok") - assert command.get_answer() == "imok\n" +def test_base_commands(client: KeeperClient): + client.create("/test_create_zk_node1", "testvalue1") + client.create("/test_create_zk_node_2", "testvalue2") + assert client.get("/test_create_zk_node1") == "testvalue1" + + client.create("/123", "1=2") + client.create("/123/321", "'foo;bar'") + assert client.get("/123") == "1=2" + assert client.get("/123/321") == "foo;bar" + + +def test_four_letter_word_commands(client: KeeperClient): + assert client.execute_query("ruok") == "imok" diff --git a/tests/integration/test_keeper_reconfig_add/test.py b/tests/integration/test_keeper_reconfig_add/test.py index 2c2da7403a1..6e05766c271 100644 --- a/tests/integration/test_keeper_reconfig_add/test.py +++ b/tests/integration/test_keeper_reconfig_add/test.py @@ -1,11 +1,10 @@ #!/usr/bin/env python3 import pytest -from helpers.cluster import ClickHouseCluster +from helpers.cluster import ClickHouseCluster, ClickHouseInstance import helpers.keeper_utils as ku import os -from kazoo.client import KazooClient -from kazoo.exceptions import BadArgumentsException +import typing as tp cluster = ClickHouseCluster(__file__) CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs") @@ -19,11 +18,7 @@ part_of_cluster = "now this node is the part of cluster" zk1, zk2, zk3 = None, None, None -def get_fake_zk(node): - return ku.get_fake_zk(cluster, node) - - -@pytest.fixture(scope="module") +@pytest.fixture(scope="module", autouse=True) def started_cluster(): try: cluster.start() @@ -43,21 +38,26 @@ def started_cluster(): yield cluster finally: + conn: tp.Optional[ku.KeeperClient] for conn in [zk1, zk2, zk3]: - if conn: + if conn is not None: conn.stop() - conn.close() cluster.shutdown() -def test_reconfig_add(started_cluster): +def create_client(node: ClickHouseInstance): + return ku.KeeperClient(cluster.server_bin_path, cluster.get_instance_ip(node.name), 9181) + + +def test_reconfig_add(): """ Add a node to another node. Then add another node to two. """ + global zk1, zk2, zk3 + zk1 = create_client(node1) - zk1 = get_fake_zk(node1) - config = ku.get_config_str(zk1) + config = zk1.get("/keeper/config") print("Initial config", config) assert len(config.split("\n")) == 1 @@ -65,24 +65,22 @@ def test_reconfig_add(started_cluster): assert "node2" not in config assert "node3" not in config - with pytest.raises(BadArgumentsException): + with pytest.raises(ku.KeeperException): # duplicate id with different endpoint zk1.reconfig(joining="server.1=localhost:1337", leaving=None, new_members=None) - with pytest.raises(BadArgumentsException): + with pytest.raises(ku.KeeperException): # duplicate endpoint zk1.reconfig(joining="server.8=node1:9234", leaving=None, new_members=None) for i in range(100): - zk1.create(f"/test_three_{i}", b"somedata") + zk1.create(f"/test_three_{i}", "somedata") node2.start_clickhouse() - config, _ = zk1.reconfig( + config = zk1.reconfig( joining="server.2=node2:9234", leaving=None, new_members=None ) ku.wait_until_connected(cluster, node2) - - config = config.decode("utf-8") print("After adding 2", config) assert len(config.split("\n")) == 2 @@ -90,12 +88,12 @@ def test_reconfig_add(started_cluster): assert "node2" in config assert "node3" not in config - zk2 = get_fake_zk(node2) + zk2 = create_client(node2) ku.wait_configs_equal(config, zk2) for i in range(100): - assert zk2.exists(f"/test_three_{i}") is not None - zk2.create(f"/test_three_{100 + i}", b"somedata") + assert zk2.exists(f"/test_three_{i}") + zk2.create(f"/test_three_{100 + i}", "somedata") # Why not both? # One node will process add_srv request, other will pull out updated config, apply @@ -107,23 +105,21 @@ def test_reconfig_add(started_cluster): assert node2.contains_in_log(part_of_cluster) zk1.stop() - zk1.close() - zk1 = get_fake_zk(node1) + zk1 = create_client(node1) zk1.sync("/test_three_0") for i in range(200): - assert zk1.exists(f"/test_three_{i}") is not None + assert zk1.exists(f"/test_three_{i}") for i in range(100): - zk2.create(f"/test_four_{i}", b"somedata") + zk2.create(f"/test_four_{i}", "somedata") node3.start_clickhouse() - config, _ = zk2.reconfig( + config = zk2.reconfig( joining="server.3=node3:9234", leaving=None, new_members=None ) ku.wait_until_connected(cluster, node3) - config = config.decode("utf-8") print("After adding 3", config) assert len(config.split("\n")) == 3 @@ -131,25 +127,23 @@ def test_reconfig_add(started_cluster): assert "node2" in config assert "node3" in config - zk3 = get_fake_zk(node3) + zk3 = create_client(node3) ku.wait_configs_equal(config, zk3) for i in range(100): - assert zk3.exists(f"/test_four_{i}") is not None - zk3.create(f"/test_four_{100 + i}", b"somedata") + assert zk3.exists(f"/test_four_{i}") + zk3.create(f"/test_four_{100 + i}", "somedata") zk1.stop() - zk1.close() - zk1 = get_fake_zk(node1) + zk1 = create_client(node1) zk1.sync("/test_four_0") zk2.stop() - zk2.close() - zk2 = get_fake_zk(node2) + zk2 = create_client(node2) zk2.sync("/test_four_0") for i in range(200): - assert zk1.exists(f"/test_four_{i}") is not None - assert zk2.exists(f"/test_four_{i}") is not None + assert zk1.exists(f"/test_four_{i}") + assert zk2.exists(f"/test_four_{i}") assert node3.contains_in_log(part_of_cluster) diff --git a/tests/integration/test_keeper_reconfig_remove/test.py b/tests/integration/test_keeper_reconfig_remove/test.py index fb0a9472df3..daab94c59c4 100644 --- a/tests/integration/test_keeper_reconfig_remove/test.py +++ b/tests/integration/test_keeper_reconfig_remove/test.py @@ -1,11 +1,11 @@ #!/usr/bin/env python3 +import subprocess import pytest -from helpers.cluster import ClickHouseCluster +from helpers.cluster import ClickHouseCluster, ClickHouseInstance import helpers.keeper_utils as ku import os -from kazoo.client import KazooClient -from kazoo.exceptions import BadVersionException, BadArgumentsException +import typing as tp cluster = ClickHouseCluster(__file__) CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs") @@ -23,16 +23,16 @@ def started_cluster(): cluster.start() yield cluster finally: + conn: tp.Optional[ku.KeeperClient] for conn in [zk1, zk2, zk3]: if conn: conn.stop() - conn.close() cluster.shutdown() -def get_fake_zk(node): - return ku.get_fake_zk(cluster, node) +def create_client(node: ClickHouseInstance): + return ku.KeeperClient(cluster.server_bin_path, cluster.get_instance_ip(node.name), 9181) def test_reconfig_remove_followers_from_3(started_cluster): @@ -42,9 +42,9 @@ def test_reconfig_remove_followers_from_3(started_cluster): Check that remaining node is in standalone mode. """ - zk1 = get_fake_zk(node1) - config, _ = zk1.get("/keeper/config") - config = config.decode("utf-8") + global zk1, zk2, zk3 + zk1 = create_client(node1) + config = zk1.get("/keeper/config") print("Initial config", config) assert len(config.split("\n")) == 3 @@ -52,36 +52,33 @@ def test_reconfig_remove_followers_from_3(started_cluster): assert "node2" in config assert "node3" in config - with pytest.raises(BadVersionException): - zk1.reconfig(joining=None, leaving="1", new_members=None, from_config=20) - with pytest.raises(BadArgumentsException): + with pytest.raises(ValueError): zk1.reconfig(joining=None, leaving=None, new_members=None) - with pytest.raises(BadArgumentsException): + with pytest.raises(ku.KeeperException): # bulk reconfiguration is not supported zk1.reconfig(joining=None, leaving=None, new_members="3") - with pytest.raises(BadArgumentsException): + with pytest.raises(ValueError): zk1.reconfig(joining="1", leaving="1", new_members="3") - with pytest.raises(BadArgumentsException): + with pytest.raises(ku.KeeperException): # at least one node must be left zk1.reconfig(joining=None, leaving="1,2,3", new_members=None) for i in range(100): - zk1.create(f"/test_two_{i}", b"somedata") + zk1.create(f"/test_two_{i}", "somedata") - zk2 = get_fake_zk(node2) + zk2 = create_client(node2) zk2.sync("/test_two_0") ku.wait_configs_equal(config, zk2) - zk3 = get_fake_zk(node3) + zk3 = create_client(node3) zk3.sync("/test_two_0") ku.wait_configs_equal(config, zk3) for i in range(100): - assert zk2.exists(f"test_two_{i}") is not None - assert zk3.exists(f"test_two_{i}") is not None + assert zk2.exists(f"test_two_{i}") + assert zk3.exists(f"test_two_{i}") - config, _ = zk1.reconfig(joining=None, leaving="3", new_members=None) - config = config.decode("utf-8") + config = zk1.reconfig(joining=None, leaving="3", new_members=None) print("After removing 3", config) assert len(config.split("\n")) == 2 @@ -90,35 +87,26 @@ def test_reconfig_remove_followers_from_3(started_cluster): assert "node3" not in config zk2.stop() - zk2.close() - zk2 = get_fake_zk(node2) + zk2 = create_client(node2) ku.wait_configs_equal(config, zk2) for i in range(100): - assert zk2.exists(f"test_two_{i}") is not None - zk2.create(f"/test_two_{100 + i}", b"otherdata") + assert zk2.exists(f"test_two_{i}") + zk2.create(f"/test_two_{100 + i}", "otherdata") zk1.stop() - zk1.close() - zk1 = get_fake_zk(node1) + zk1 = create_client(node1) zk1.sync("/test_two_0") for i in range(200): - assert zk1.exists(f"test_two_{i}") is not None - - with pytest.raises(Exception): - zk3.stop() - zk3.close() - zk3 = get_fake_zk(node3) - zk3.sync("/test_two_0") + assert zk1.exists(f"test_two_{i}") assert node3.contains_in_log(log_msg_removed) for i in range(100): - zk2.create(f"/test_two_{200 + i}", b"otherdata") + zk2.create(f"/test_two_{200 + i}", "otherdata") - config, _ = zk1.reconfig(joining=None, leaving="2", new_members=None) - config = config.decode("utf-8") + config = zk1.reconfig(joining=None, leaving="2", new_members=None) print("After removing 2", config) assert len(config.split("\n")) == 1 @@ -127,19 +115,12 @@ def test_reconfig_remove_followers_from_3(started_cluster): assert "node3" not in config zk1.stop() - zk1.close() - zk1 = get_fake_zk(node1) + zk1 = create_client(node1) zk1.sync("/test_two_0") for i in range(300): - assert zk1.exists(f"test_two_{i}") is not None - - with pytest.raises(Exception): - zk2.stop() - zk2.close() - zk2 = get_fake_zk(node2) - zk2.sync("/test_two_0") + assert zk1.exists(f"test_two_{i}") assert not node1.contains_in_log(log_msg_removed) assert node2.contains_in_log(log_msg_removed) - assert "Mode: standalone" in zk1.command(b"stat") + assert "Mode: standalone" in zk1.execute_query('stat') diff --git a/tests/integration/test_keeper_reconfig_remove_many/test.py b/tests/integration/test_keeper_reconfig_remove_many/test.py index ec0d8b95eff..e99327e1098 100644 --- a/tests/integration/test_keeper_reconfig_remove_many/test.py +++ b/tests/integration/test_keeper_reconfig_remove_many/test.py @@ -1,11 +1,11 @@ #!/usr/bin/env python3 import pytest -from helpers.cluster import ClickHouseCluster +from helpers.cluster import ClickHouseCluster, ClickHouseInstance import helpers.keeper_utils as ku import os -from kazoo.client import KazooClient, KazooState -from kazoo.exceptions import BadVersionException, BadArgumentsException +import typing as tp + cluster = ClickHouseCluster(__file__) CONFIG_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), "configs") @@ -26,49 +26,49 @@ def started_cluster(): cluster.start() yield cluster finally: + conn: tp.Optional[ku.KeeperClient] for conn in [zk1, zk2, zk3, zk4, zk5]: if conn: conn.stop() - conn.close() cluster.shutdown() -def get_fake_zk(node): - return ku.get_fake_zk(cluster, node) +def create_client(node: ClickHouseInstance): + return ku.KeeperClient(cluster.server_bin_path, cluster.get_instance_ip(node.name), 9181) def test_reconfig_remove_2_and_leader(started_cluster): """ Remove 2 followers from a cluster of 5. Remove leader from 3 nodes. """ + global zk1, zk2, zk3, zk4, zk5 - zk1 = get_fake_zk(node1) + zk1 = create_client(node1) config = ku.get_config_str(zk1) print("Initial config", config) assert len(config.split("\n")) == 5 for i in range(100): - zk1.create(f"/test_two_{i}", b"somedata") + zk1.create(f"/test_two_{i}", "somedata") - zk4 = get_fake_zk(node4) + zk4 = create_client(node4) zk4.sync("/test_two_0") ku.wait_configs_equal(config, zk4) - zk5 = get_fake_zk(node5) + zk5 = create_client(node5) zk5.sync("/test_two_0") ku.wait_configs_equal(config, zk5) for i in range(100): - assert zk4.exists(f"test_two_{i}") is not None - assert zk5.exists(f"test_two_{i}") is not None + assert zk4.exists(f"test_two_{i}") + assert zk5.exists(f"test_two_{i}") - zk4.create(f"/test_two_{100 + i}", b"otherdata") + zk4.create(f"/test_two_{100 + i}", "otherdata") - zk2 = get_fake_zk(node2) - config, _ = zk2.reconfig(joining=None, leaving="4,5", new_members=None) - config = config.decode("utf-8") + zk2 = create_client(node2) + config = zk2.reconfig(joining=None, leaving="4,5", new_members=None) print("After removing 4,5", config) assert len(config.split("\n")) == 3 @@ -79,27 +79,14 @@ def test_reconfig_remove_2_and_leader(started_cluster): assert "node5" not in config zk1.stop() - zk1.close() - zk1 = get_fake_zk(node1) + zk1 = create_client(node1) zk1.sync("/test_two_0") ku.wait_configs_equal(config, zk1) for i in range(200): - assert zk1.exists(f"test_two_{i}") is not None - assert zk2.exists(f"test_two_{i}") is not None - - with pytest.raises(Exception): - zk4.stop() - zk4.close() - zk4 = get_fake_zk(node4) - zk4.sync("/test_two_0") - - with pytest.raises(Exception): - zk5.stop() - zk5.close() - zk5 = get_fake_zk(node5) - zk5.sync("/test_two_0") + assert zk1.exists(f"test_two_{i}") + assert zk2.exists(f"test_two_{i}") assert not node1.contains_in_log(log_msg_removed) assert not node2.contains_in_log(log_msg_removed) @@ -110,11 +97,10 @@ def test_reconfig_remove_2_and_leader(started_cluster): assert ku.is_leader(cluster, node1) for i in range(100): - zk1.create(f"/test_leader_{i}", b"somedata") + zk1.create(f"/test_leader_{i}", "somedata") # when a leader gets a remove request, it must yield leadership - config, _ = zk1.reconfig(joining=None, leaving="1", new_members=None) - config = config.decode("utf-8") + config = zk1.reconfig(joining=None, leaving="1", new_members=None) print("After removing 1 (leader)", config) assert len(config.split("\n")) == 2 @@ -125,24 +111,17 @@ def test_reconfig_remove_2_and_leader(started_cluster): assert "node5" not in config zk2.stop() - zk2.close() - zk2 = get_fake_zk(node2) + zk2 = create_client(node2) zk2.sync("/test_leader_0") ku.wait_configs_equal(config, zk2) - zk3 = get_fake_zk(node3) + zk3 = create_client(node3) zk3.sync("/test_leader_0") ku.wait_configs_equal(config, zk3) for i in range(100): - assert zk2.exists(f"test_leader_{i}") is not None - assert zk3.exists(f"test_leader_{i}") is not None - - with pytest.raises(Exception): - zk1.stop() - zk1.close() - zk1 = get_fake_zk(node1) - zk1.sync("/test_leader_0") + assert zk2.exists(f"test_leader_{i}") + assert zk3.exists(f"test_leader_{i}") assert node1.contains_in_log(log_msg_removed) assert not node2.contains_in_log(log_msg_removed) diff --git a/tests/integration/test_keeper_reconfig_replace_leader/test.py b/tests/integration/test_keeper_reconfig_replace_leader/test.py index ef1d5394b67..bbb5b66eaa5 100644 --- a/tests/integration/test_keeper_reconfig_replace_leader/test.py +++ b/tests/integration/test_keeper_reconfig_replace_leader/test.py @@ -1,11 +1,10 @@ #!/usr/bin/env python3 import pytest -from helpers.cluster import ClickHouseCluster +from helpers.cluster import ClickHouseCluster, ClickHouseInstance from os.path import join, dirname, realpath -import time import helpers.keeper_utils as ku -from kazoo.client import KazooClient, KazooState +import typing as tp cluster = ClickHouseCluster(__file__) CONFIG_DIR = join(dirname(realpath(__file__)), "configs") @@ -31,24 +30,26 @@ def started_cluster(): yield cluster finally: + conn: tp.Optional[ku.KeeperClient] for conn in [zk1, zk2, zk3, zk4]: if conn: conn.stop() - conn.close() cluster.shutdown() -def get_fake_zk(node): - return ku.get_fake_zk(cluster, node) +def create_client(node: ClickHouseInstance): + return ku.KeeperClient( + cluster.server_bin_path, cluster.get_instance_ip(node.name), 9181 + ) def test_reconfig_replace_leader(started_cluster): """ Remove leader from a cluster of 3 and add a new node via two commands. """ - - zk1 = get_fake_zk(node1) + global zk1, zk2, zk3, zk4 + zk1 = create_client(node1) config = ku.get_config_str(zk1) assert len(config.split("\n")) == 3 @@ -58,23 +59,22 @@ def test_reconfig_replace_leader(started_cluster): assert "node4" not in config for i in range(100): - zk1.create(f"/test_four_{i}", b"somedata") + zk1.create(f"/test_four_{i}", "somedata") - zk2 = get_fake_zk(node2) + zk2 = create_client(node2) zk2.sync("/test_four_0") ku.wait_configs_equal(config, zk2) - zk3 = get_fake_zk(node3) + zk3 = create_client(node3) zk3.sync("/test_four_0") ku.wait_configs_equal(config, zk3) for i in range(100): - assert zk2.exists(f"/test_four_{i}") is not None - assert zk3.exists(f"/test_four_{i}") is not None + assert zk2.exists(f"/test_four_{i}") + assert zk3.exists(f"/test_four_{i}") assert ku.is_leader(cluster, node1) - config, _ = zk2.reconfig(joining=None, leaving="1", new_members=None) - config = config.decode("utf-8") + config = zk2.reconfig(joining=None, leaving="1", new_members=None) print("After removing 1 (leader)", config) assert len(config.split("\n")) == 2 @@ -85,17 +85,10 @@ def test_reconfig_replace_leader(started_cluster): ku.wait_configs_equal(config, zk2) - with pytest.raises(Exception): - zk1.stop() - zk1.close() - zk1 = get_fake_zk(node1) - zk1.sync("/test_four_0") - node4.start_clickhouse() - config, _ = zk2.reconfig( + config = zk2.reconfig( joining="server.4=node4:9234", leaving=None, new_members=None ) - config = config.decode("utf-8") ku.wait_until_connected(cluster, node4) print("After adding 4", config) @@ -105,22 +98,20 @@ def test_reconfig_replace_leader(started_cluster): assert "node3" in config assert "node4" in config - zk4 = get_fake_zk(node4) + zk4 = create_client(node4) ku.wait_configs_equal(config, zk4) for i in range(100): - assert zk4.exists(f"test_four_{i}") is not None - zk4.create(f"/test_four_{100 + i}", b"somedata") + assert zk4.exists(f"test_four_{i}") + zk4.create(f"/test_four_{100 + i}", "somedata") zk2.stop() - zk2.close() - zk2 = get_fake_zk(node2) + zk2 = create_client(node2) zk2.sync("/test_four_0") ku.wait_configs_equal(config, zk2) zk3.stop() - zk3.close() - zk3 = get_fake_zk(node3) + zk3 = create_client(node3) zk3.sync("/test_four_0") ku.wait_configs_equal(config, zk3) diff --git a/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/__init__.py b/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/configs/keeper1.xml b/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/configs/keeper1.xml deleted file mode 100644 index 71f3403aca3..00000000000 --- a/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/configs/keeper1.xml +++ /dev/null @@ -1,35 +0,0 @@ - - - 9181 - 1 - /var/lib/clickhouse/coordination/log - /var/lib/clickhouse/coordination/snapshots - true - - - 5000 - 10000 - trace - - - - - 1 - node1 - 9234 - - - 2 - node2 - 9234 - true - - - 3 - node3 - 9234 - true - - - - diff --git a/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/configs/keeper2.xml b/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/configs/keeper2.xml deleted file mode 100644 index faefb4d1102..00000000000 --- a/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/configs/keeper2.xml +++ /dev/null @@ -1,35 +0,0 @@ - - - 9181 - 2 - /var/lib/clickhouse/coordination/log - /var/lib/clickhouse/coordination/snapshots - true - - - 5000 - 10000 - trace - - - - - 1 - node1 - 9234 - - - 2 - node2 - 9234 - true - - - 3 - node3 - 9234 - true - - - - diff --git a/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/configs/keeper3.xml b/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/configs/keeper3.xml deleted file mode 100644 index 80a9caa92c2..00000000000 --- a/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/configs/keeper3.xml +++ /dev/null @@ -1,35 +0,0 @@ - - - 9181 - 3 - /var/lib/clickhouse/coordination/log - /var/lib/clickhouse/coordination/snapshots - true - - - 5000 - 10000 - trace - - - - - 1 - node1 - 9234 - - - 2 - node2 - 9234 - true - - - 3 - node3 - 9234 - true - - - - diff --git a/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/configs/keeper4.xml b/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/configs/keeper4.xml deleted file mode 100644 index 9fd88fe5d63..00000000000 --- a/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/configs/keeper4.xml +++ /dev/null @@ -1,21 +0,0 @@ - - - 9181 - 4 - /var/lib/clickhouse/coordination/log - /var/lib/clickhouse/coordination/snapshots - true - - - 5000 - 10000 - trace - - - - 2 node2 9234 - 3 node3 9234 - 4 node4 9234 - - - diff --git a/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/test.py b/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/test.py deleted file mode 100644 index b099d0513e1..00000000000 --- a/tests/integration/test_keeper_reconfig_replace_leader_in_one_command/test.py +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env python3 - -import pytest -from helpers.cluster import ClickHouseCluster -from os.path import join, dirname, realpath -import time -import helpers.keeper_utils as ku -from kazoo.client import KazooClient, KazooState - -cluster = ClickHouseCluster(__file__) -CONFIG_DIR = join(dirname(realpath(__file__)), "configs") - -node1 = cluster.add_instance("node1", main_configs=["configs/keeper1.xml"]) -node2 = cluster.add_instance("node2", main_configs=["configs/keeper2.xml"]) -node3 = cluster.add_instance("node3", main_configs=["configs/keeper3.xml"]) -node4 = cluster.add_instance("node4", stay_alive=True) -zk1, zk2, zk3, zk4 = None, None, None, None - - -@pytest.fixture(scope="module") -def started_cluster(): - try: - cluster.start() - - node4.stop_clickhouse() - node4.copy_file_to_container( - join(CONFIG_DIR, "keeper4.xml"), - "/etc/clickhouse-server/config.d/keeper.xml", - ) - - yield cluster - - finally: - for conn in [zk1, zk2, zk3, zk4]: - if conn: - conn.stop() - conn.close() - - cluster.shutdown() - - -def get_fake_zk(node): - return ku.get_fake_zk(cluster, node)