import contextlib import grpc import psycopg2 import pymysql.connections import pymysql.err import pytest import sys import time from helpers.cluster import ClickHouseCluster, run_and_check from helpers.client import Client, QueryRuntimeException from kazoo.exceptions import NodeExistsError from pathlib import Path from requests.exceptions import ConnectionError from urllib3.util.retry import Retry cluster = ClickHouseCluster(__file__) instance = cluster.add_instance( "instance", main_configs=[ "configs/ports_from_zk.xml", "configs/ssl_conf.xml", "configs/dhparam.pem", "configs/server.crt", "configs/server.key", ], user_configs=["configs/default_passwd.xml"], with_zookeeper=True, ) LOADS_QUERY = "SELECT value FROM system.events WHERE event = 'MainConfigLoads'" # Use grpcio-tools to generate *pb2.py files from *.proto. proto_dir = Path(__file__).parent / "protos" gen_dir = Path(__file__).parent / "_gen" gen_dir.mkdir(exist_ok=True) run_and_check( f"python3 -m grpc_tools.protoc -I{proto_dir!s} --python_out={gen_dir!s} --grpc_python_out={gen_dir!s} \ {proto_dir!s}/clickhouse_grpc.proto", shell=True, ) sys.path.append(str(gen_dir)) import clickhouse_grpc_pb2 import clickhouse_grpc_pb2_grpc @pytest.fixture(name="cluster", scope="module") def fixture_cluster(): try: cluster.add_zookeeper_startup_command(configure_ports_from_zk) cluster.start() yield cluster finally: cluster.shutdown() @pytest.fixture(name="zk", scope="module") def fixture_zk(cluster): return cluster.get_kazoo_client("zoo1") def get_client(cluster, port): return Client( host=cluster.get_instance_ip("instance"), port=port, command=cluster.client_bin_path, ) def get_mysql_client(cluster, port): start_time = time.monotonic() while True: try: return pymysql.connections.Connection( host=cluster.get_instance_ip("instance"), user="default", password="", database="default", port=port, ) except pymysql.err.OperationalError: if time.monotonic() - start_time > 10: raise time.sleep(0.1) def get_pgsql_client(cluster, port): start_time = time.monotonic() while True: try: return psycopg2.connect( host=cluster.get_instance_ip("instance"), user="postgresql", password="123", database="default", port=port, ) except psycopg2.OperationalError: if time.monotonic() - start_time > 10: raise time.sleep(0.1) def get_grpc_channel(cluster, port): host_port = cluster.get_instance_ip("instance") + f":{port}" channel = grpc.insecure_channel(host_port) grpc.channel_ready_future(channel).result(timeout=10) return channel def grpc_query(channel, query_text): query_info = clickhouse_grpc_pb2.QueryInfo(query=query_text) stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(channel) result = stub.ExecuteQuery(query_info) if result and result.HasField("exception"): raise Exception(result.exception.display_text) return result.output.decode() def configure_ports_from_zk(zk, querier=None): default_config = [ ("/clickhouse/listen_hosts", b"0.0.0.0"), ("/clickhouse/ports/tcp", b"9000"), ("/clickhouse/ports/http", b"8123"), ("/clickhouse/ports/mysql", b"9004"), ("/clickhouse/ports/postgresql", b"9005"), ("/clickhouse/ports/grpc", b"9100"), ] for path, value in default_config: if querier is not None: loads_before = querier(LOADS_QUERY) has_changed = False try: zk.create(path=path, value=value, makepath=True) has_changed = True except NodeExistsError: if zk.get(path) != value: zk.set(path=path, value=value) has_changed = True if has_changed and querier is not None: wait_loaded_config_changed(loads_before, querier) @contextlib.contextmanager def sync_loaded_config(querier): # Depending on whether we test a change on tcp or http # we monitor canges using the other, untouched, protocol loads_before = querier(LOADS_QUERY) yield wait_loaded_config_changed(loads_before, querier) def wait_loaded_config_changed(loads_before, querier): loads_after = None start_time = time.monotonic() while time.monotonic() - start_time < 10: try: loads_after = querier(LOADS_QUERY) if loads_after != loads_before: return except (QueryRuntimeException, ConnectionError): pass time.sleep(0.1) assert loads_after is not None and loads_after != loads_before @contextlib.contextmanager def default_client(cluster, zk, restore_via_http=False): client = get_client(cluster, port=9000) try: yield client finally: querier = instance.http_query if restore_via_http else client.query configure_ports_from_zk(zk, querier) def test_change_tcp_port(cluster, zk): with default_client(cluster, zk, restore_via_http=True) as client: assert client.query("SELECT 1") == "1\n" with sync_loaded_config(instance.http_query): zk.set("/clickhouse/ports/tcp", b"9090") with pytest.raises(QueryRuntimeException, match="Connection refused"): client.query("SELECT 1") client_on_new_port = get_client(cluster, port=9090) assert client_on_new_port.query("SELECT 1") == "1\n" def test_change_http_port(cluster, zk): with default_client(cluster, zk) as client: retry_strategy = Retry(total=10, backoff_factor=0.1) assert instance.http_query("SELECT 1", retry_strategy=retry_strategy) == "1\n" with sync_loaded_config(client.query): zk.set("/clickhouse/ports/http", b"9090") with pytest.raises(ConnectionError, match="Connection refused"): instance.http_query("SELECT 1") instance.http_query("SELECT 1", port=9090) == "1\n" def test_change_mysql_port(cluster, zk): with default_client(cluster, zk) as client: mysql_client = get_mysql_client(cluster, port=9004) assert mysql_client.query("SELECT 1") == 1 with sync_loaded_config(client.query): zk.set("/clickhouse/ports/mysql", b"9090") with pytest.raises(pymysql.err.OperationalError, match="Lost connection"): mysql_client.query("SELECT 1") mysql_client_on_new_port = get_mysql_client(cluster, port=9090) assert mysql_client_on_new_port.query("SELECT 1") == 1 def test_change_postgresql_port(cluster, zk): with default_client(cluster, zk) as client: pgsql_client = get_pgsql_client(cluster, port=9005) cursor = pgsql_client.cursor() cursor.execute("SELECT 1") assert cursor.fetchall() == [(1,)] with sync_loaded_config(client.query): zk.set("/clickhouse/ports/postgresql", b"9090") with pytest.raises(psycopg2.OperationalError, match="closed"): cursor.execute("SELECT 1") pgsql_client_on_new_port = get_pgsql_client(cluster, port=9090) cursor = pgsql_client_on_new_port.cursor() cursor.execute("SELECT 1") cursor.fetchall() == [(1,)] def test_change_grpc_port(cluster, zk): with default_client(cluster, zk) as client: grpc_channel = get_grpc_channel(cluster, port=9100) assert grpc_query(grpc_channel, "SELECT 1") == "1\n" with sync_loaded_config(client.query): zk.set("/clickhouse/ports/grpc", b"9090") with pytest.raises( grpc._channel._InactiveRpcError, match="StatusCode.UNAVAILABLE" ): grpc_query(grpc_channel, "SELECT 1") grpc_channel_on_new_port = get_grpc_channel(cluster, port=9090) assert grpc_query(grpc_channel_on_new_port, "SELECT 1") == "1\n" def test_remove_tcp_port(cluster, zk): with default_client(cluster, zk, restore_via_http=True) as client: assert client.query("SELECT 1") == "1\n" with sync_loaded_config(instance.http_query): zk.delete("/clickhouse/ports/tcp") with pytest.raises(QueryRuntimeException, match="Connection refused"): client.query("SELECT 1") def test_remove_http_port(cluster, zk): with default_client(cluster, zk) as client: assert instance.http_query("SELECT 1") == "1\n" with sync_loaded_config(client.query): zk.delete("/clickhouse/ports/http") with pytest.raises(ConnectionError, match="Connection refused"): instance.http_query("SELECT 1") def test_remove_mysql_port(cluster, zk): with default_client(cluster, zk) as client: mysql_client = get_mysql_client(cluster, port=9004) assert mysql_client.query("SELECT 1") == 1 with sync_loaded_config(client.query): zk.delete("/clickhouse/ports/mysql") with pytest.raises(pymysql.err.OperationalError, match="Lost connection"): mysql_client.query("SELECT 1") def test_remove_postgresql_port(cluster, zk): with default_client(cluster, zk) as client: pgsql_client = get_pgsql_client(cluster, port=9005) cursor = pgsql_client.cursor() cursor.execute("SELECT 1") assert cursor.fetchall() == [(1,)] with sync_loaded_config(client.query): zk.delete("/clickhouse/ports/postgresql") with pytest.raises(psycopg2.OperationalError, match="closed"): cursor.execute("SELECT 1") def test_remove_grpc_port(cluster, zk): with default_client(cluster, zk) as client: grpc_channel = get_grpc_channel(cluster, port=9100) assert grpc_query(grpc_channel, "SELECT 1") == "1\n" with sync_loaded_config(client.query): zk.delete("/clickhouse/ports/grpc") with pytest.raises( grpc._channel._InactiveRpcError, match="StatusCode.UNAVAILABLE" ): grpc_query(grpc_channel, "SELECT 1") def test_change_listen_host(cluster, zk): localhost_client = Client( host="127.0.0.1", port=9000, command="/usr/bin/clickhouse" ) localhost_client.command = [ "docker", "exec", "-i", instance.docker_id, ] + localhost_client.command try: client = get_client(cluster, port=9000) with sync_loaded_config(localhost_client.query): zk.set("/clickhouse/listen_hosts", b"127.0.0.1") with pytest.raises(QueryRuntimeException, match="Connection refused"): client.query("SELECT 1") assert localhost_client.query("SELECT 1") == "1\n" finally: with sync_loaded_config(localhost_client.query): configure_ports_from_zk(zk)