mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
More stable test_keeper_reconfig_replace_leader
This commit is contained in:
parent
352eca7aa0
commit
bb68321fc1
@ -37,39 +37,59 @@ class KeeperException(Exception):
|
||||
class KeeperClient(object):
|
||||
SEPARATOR = b"\a\a\a\a\n"
|
||||
|
||||
def __init__(self, bin_path: str, host: str, port: int):
|
||||
def __init__(self, bin_path: str, host: str, port: int, connection_tries=30):
|
||||
self.bin_path = bin_path
|
||||
self.host = host
|
||||
self.port = port
|
||||
|
||||
self.proc = subprocess.Popen(
|
||||
[
|
||||
bin_path,
|
||||
"keeper-client",
|
||||
"--host",
|
||||
host,
|
||||
"--port",
|
||||
str(port),
|
||||
"--log-level",
|
||||
"error",
|
||||
"--tests-mode",
|
||||
"--no-confirmation",
|
||||
],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
retry_count = 0
|
||||
|
||||
self.poller = select.epoll()
|
||||
self.poller.register(self.proc.stdout)
|
||||
self.poller.register(self.proc.stderr)
|
||||
while True:
|
||||
try:
|
||||
self.proc = subprocess.Popen(
|
||||
[
|
||||
bin_path,
|
||||
"keeper-client",
|
||||
"--host",
|
||||
host,
|
||||
"--port",
|
||||
str(port),
|
||||
"--log-level",
|
||||
"error",
|
||||
"--tests-mode",
|
||||
"--no-confirmation",
|
||||
],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
|
||||
self._fd_nums = {
|
||||
self.proc.stdout.fileno(): self.proc.stdout,
|
||||
self.proc.stderr.fileno(): self.proc.stderr,
|
||||
}
|
||||
self.poller = select.epoll()
|
||||
self.poller.register(self.proc.stdout)
|
||||
self.poller.register(self.proc.stderr)
|
||||
|
||||
self.stopped = False
|
||||
self._fd_nums = {
|
||||
self.proc.stdout.fileno(): self.proc.stdout,
|
||||
self.proc.stderr.fileno(): self.proc.stderr,
|
||||
}
|
||||
|
||||
self.stopped = False
|
||||
|
||||
self.get("/keeper", 60.0)
|
||||
break
|
||||
except Exception as e:
|
||||
retry_count += 1
|
||||
if (
|
||||
"All connection tries failed while connecting to ZooKeeper"
|
||||
in str(e)
|
||||
and retry_count < connection_tries
|
||||
):
|
||||
print(
|
||||
f"Got exception while connecting to Keeper: {e}\nWill reconnect, reconnect count = {retry_count}"
|
||||
)
|
||||
time.sleep(1)
|
||||
else:
|
||||
raise
|
||||
|
||||
def execute_query(self, query: str, timeout: float = 60.0) -> str:
|
||||
output = io.BytesIO()
|
||||
@ -94,7 +114,7 @@ class KeeperClient(object):
|
||||
output.write(chunk)
|
||||
|
||||
elif file == self.proc.stderr:
|
||||
assert self.proc.stdout.readline() == self.SEPARATOR
|
||||
self.proc.stdout.readline()
|
||||
raise KeeperException(self.proc.stderr.readline().strip().decode())
|
||||
|
||||
else:
|
||||
@ -221,13 +241,12 @@ NOT_SERVING_REQUESTS_ERROR_MSG = "This instance is not currently serving request
|
||||
|
||||
|
||||
def wait_until_connected(cluster, node, port=9181, timeout=30.0):
|
||||
elapsed = 0.0
|
||||
start = time.time()
|
||||
|
||||
while send_4lw_cmd(cluster, node, "mntr", port) == NOT_SERVING_REQUESTS_ERROR_MSG:
|
||||
time.sleep(0.1)
|
||||
elapsed += 0.1
|
||||
|
||||
if elapsed >= timeout:
|
||||
if time.time() - start > timeout:
|
||||
raise Exception(
|
||||
f"{timeout}s timeout while waiting for {node.name} to start serving requests"
|
||||
)
|
||||
@ -280,14 +299,16 @@ def wait_configs_equal(left_config: str, right_zk: KeeperClient, timeout: float
|
||||
Check whether get /keeper/config result in left_config is equal
|
||||
to get /keeper/config on right_zk ZK connection.
|
||||
"""
|
||||
elapsed: float = 0.0
|
||||
while sorted(left_config.split("\n")) != sorted(
|
||||
get_config_str(right_zk).split("\n")
|
||||
):
|
||||
start = time.time()
|
||||
left_config = sorted(left_config.split("\n"))
|
||||
while True:
|
||||
right_config = sorted(get_config_str(right_zk).split("\n"))
|
||||
if left_config == right_config:
|
||||
return
|
||||
|
||||
time.sleep(1)
|
||||
elapsed += 1
|
||||
if elapsed >= timeout:
|
||||
if time.time() - start > timeout:
|
||||
raise Exception(
|
||||
f"timeout while checking nodes configs to get equal. "
|
||||
f"Left: {left_config}, right: {get_config_str(right_zk)}"
|
||||
f"Left: {left_config}, right: {right_config}"
|
||||
)
|
||||
|
@ -3,6 +3,7 @@
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
|
||||
from os.path import join, dirname, realpath
|
||||
import time
|
||||
import helpers.keeper_utils as ku
|
||||
import typing as tp
|
||||
|
||||
@ -83,6 +84,12 @@ def test_reconfig_replace_leader(started_cluster):
|
||||
assert "node3" in config
|
||||
assert "node4" not in config
|
||||
|
||||
# wait until cluster stabilizes with a new leader
|
||||
while not ku.is_leader(started_cluster, node2) and not ku.is_leader(
|
||||
started_cluster, node3
|
||||
):
|
||||
time.sleep(1)
|
||||
|
||||
# additional 20s wait before removing leader
|
||||
ku.wait_configs_equal(config, zk2, timeout=50)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user