# pylint: disable=wrong-import-order
# pylint: disable=line-too-long
# pylint: disable=redefined-builtin
# pylint: disable=redefined-outer-name
# pylint: disable=protected-access
# pylint: disable=broad-except
import contextlib
import grpc
import psycopg2
import pymysql.connections
import pymysql.err
import pytest
import sys
import os
import time
import logging
from helpers.cluster import ClickHouseCluster, run_and_check
from helpers.client import Client, QueryRuntimeException
from kazoo.exceptions import NodeExistsError
from pathlib import Path
from requests.exceptions import ConnectionError
from urllib3.util.retry import Retry
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"instance",
main_configs=[
"configs/overrides_from_zk.xml",
"configs/ssl_conf.xml",
"configs/dhparam.pem",
"configs/server.crt",
"configs/server.key",
],
user_configs=["configs/default_passwd.xml"],
with_zookeeper=True,
# Bug in TSAN reproduces in this test https://github.com/grpc/grpc/issues/29550#issuecomment-1188085387
env_variables={
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS")
},
)
LOADS_QUERY = "SELECT value FROM system.events WHERE event = 'MainConfigLoads'"
# Use grpcio-tools to generate *pb2.py files from *.proto.
proto_dir = Path(__file__).parent / "protos"
gen_dir = Path(__file__).parent / "_gen"
gen_dir.mkdir(exist_ok=True)
run_and_check(
f"python3 -m grpc_tools.protoc -I{proto_dir!s} --python_out={gen_dir!s} --grpc_python_out={gen_dir!s} \
{proto_dir!s}/clickhouse_grpc.proto",
shell=True,
)
sys.path.append(str(gen_dir))
import clickhouse_grpc_pb2
import clickhouse_grpc_pb2_grpc
@pytest.fixture(name="cluster", scope="module")
def fixture_cluster():
try:
cluster.add_zookeeper_startup_command(configure_from_zk)
cluster.start()
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(name="zk", scope="module")
def fixture_zk(cluster):
return cluster.get_kazoo_client("zoo1")
def get_client(cluster, port):
return Client(
host=cluster.get_instance_ip("instance"),
port=port,
command=cluster.client_bin_path,
)
def get_mysql_client(cluster, port):
start_time = time.monotonic()
while True:
try:
return pymysql.connections.Connection(
host=cluster.get_instance_ip("instance"),
user="default",
password="",
database="default",
port=port,
)
except pymysql.err.OperationalError:
if time.monotonic() - start_time > 10:
raise
time.sleep(0.1)
def get_pgsql_client(cluster, port):
start_time = time.monotonic()
while True:
try:
return psycopg2.connect(
host=cluster.get_instance_ip("instance"),
user="postgresql",
password="123",
database="default",
port=port,
)
except psycopg2.OperationalError:
if time.monotonic() - start_time > 10:
raise
time.sleep(0.1)
@contextlib.contextmanager
def get_grpc_channel(cluster, port):
host_port = cluster.get_instance_ip("instance") + f":{port}"
channel = grpc.insecure_channel(host_port)
grpc.channel_ready_future(channel).result(timeout=10)
try:
yield channel
finally:
channel.close()
def grpc_query(channel, query_text):
query_info = clickhouse_grpc_pb2.QueryInfo(query=query_text)
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(channel)
result = stub.ExecuteQuery(query_info)
if result and result.HasField("exception"):
raise Exception(result.exception.display_text)
return result.output.decode()
def configure_from_zk(zk, querier=None):
default_config = [
("/clickhouse/listen_hosts", b"0.0.0.0"),
("/clickhouse/ports/tcp", b"9000"),
("/clickhouse/ports/http", b"8123"),
("/clickhouse/ports/mysql", b"9004"),
("/clickhouse/ports/postgresql", b"9005"),
("/clickhouse/ports/grpc", b"9100"),
("/clickhouse/http_handlers", b""),
]
for path, value in default_config:
if querier is not None:
loads_before = querier(LOADS_QUERY)
has_changed = False
try:
zk.create(path=path, value=value, makepath=True)
has_changed = True
except NodeExistsError:
if zk.get(path)[0] != value:
zk.set(path=path, value=value)
has_changed = True
if has_changed and querier is not None:
wait_loaded_config_changed(loads_before, querier)
@contextlib.contextmanager
def sync_loaded_config(querier):
# Depending on whether we test a change on tcp or http
# we monitor canges using the other, untouched, protocol
loads_before = querier(LOADS_QUERY)
yield
wait_loaded_config_changed(loads_before, querier)
def wait_loaded_config_changed(loads_before, querier):
loads_after = None
start_time = time.monotonic()
while time.monotonic() - start_time < 10:
try:
loads_after = querier(LOADS_QUERY)
if loads_after != loads_before:
return
except (QueryRuntimeException, ConnectionError):
pass
time.sleep(0.1)
assert loads_after is not None and loads_after != loads_before
@contextlib.contextmanager
def default_client(cluster, zk, restore_via_http=False):
client = get_client(cluster, port=9000)
try:
yield client
finally:
querier = instance.http_query if restore_via_http else client.query
configure_from_zk(zk, querier)
def test_change_tcp_port(cluster, zk):
with default_client(cluster, zk, restore_via_http=True) as client:
assert client.query("SELECT 1") == "1\n"
with sync_loaded_config(instance.http_query):
zk.set("/clickhouse/ports/tcp", b"9090")
with pytest.raises(QueryRuntimeException, match="Connection refused"):
client.query("SELECT 1")
client_on_new_port = get_client(cluster, port=9090)
assert client_on_new_port.query("SELECT 1") == "1\n"
def test_change_http_port(cluster, zk):
with default_client(cluster, zk) as client:
retry_strategy = Retry(total=10, backoff_factor=0.1)
assert instance.http_query("SELECT 1", retry_strategy=retry_strategy) == "1\n"
with sync_loaded_config(client.query):
zk.set("/clickhouse/ports/http", b"9090")
with pytest.raises(ConnectionError, match="Connection refused"):
instance.http_query("SELECT 1")
assert instance.http_query("SELECT 1", port=9090) == "1\n"
def test_change_mysql_port(cluster, zk):
with default_client(cluster, zk) as client:
mysql_client = get_mysql_client(cluster, port=9004)
assert mysql_client.query("SELECT 1") == 1
with sync_loaded_config(client.query):
zk.set("/clickhouse/ports/mysql", b"9090")
with pytest.raises(pymysql.err.OperationalError, match="Lost connection"):
mysql_client.query("SELECT 1")
mysql_client_on_new_port = get_mysql_client(cluster, port=9090)
assert mysql_client_on_new_port.query("SELECT 1") == 1
def test_change_postgresql_port(cluster, zk):
with default_client(cluster, zk) as client:
pgsql_client = get_pgsql_client(cluster, port=9005)
cursor = pgsql_client.cursor()
cursor.execute("SELECT 1")
assert cursor.fetchall() == [(1,)]
with sync_loaded_config(client.query):
zk.set("/clickhouse/ports/postgresql", b"9090")
with pytest.raises(psycopg2.OperationalError, match="closed"):
cursor.execute("SELECT 1")
pgsql_client_on_new_port = get_pgsql_client(cluster, port=9090)
cursor = pgsql_client_on_new_port.cursor()
cursor.execute("SELECT 1")
assert cursor.fetchall() == [(1,)]
def test_change_grpc_port(cluster, zk):
with default_client(cluster, zk) as client:
with get_grpc_channel(cluster, port=9100) as grpc_channel:
assert grpc_query(grpc_channel, "SELECT 1") == "1\n"
with sync_loaded_config(client.query):
zk.set("/clickhouse/ports/grpc", b"9090")
with pytest.raises(
grpc._channel._InactiveRpcError, match="StatusCode.UNAVAILABLE"
):
grpc_query(grpc_channel, "SELECT 1")
with get_grpc_channel(cluster, port=9090) as grpc_channel_on_new_port:
assert grpc_query(grpc_channel_on_new_port, "SELECT 1") == "1\n"
def test_remove_tcp_port(cluster, zk):
with default_client(cluster, zk, restore_via_http=True) as client:
assert client.query("SELECT 1") == "1\n"
with sync_loaded_config(instance.http_query):
zk.delete("/clickhouse/ports/tcp")
with pytest.raises(QueryRuntimeException, match="Connection refused"):
client.query("SELECT 1")
def test_remove_http_port(cluster, zk):
with default_client(cluster, zk) as client:
assert instance.http_query("SELECT 1") == "1\n"
with sync_loaded_config(client.query):
zk.delete("/clickhouse/ports/http")
with pytest.raises(ConnectionError, match="Connection refused"):
instance.http_query("SELECT 1")
def test_remove_mysql_port(cluster, zk):
with default_client(cluster, zk) as client:
mysql_client = get_mysql_client(cluster, port=9004)
assert mysql_client.query("SELECT 1") == 1
with sync_loaded_config(client.query):
zk.delete("/clickhouse/ports/mysql")
with pytest.raises(pymysql.err.OperationalError, match="Lost connection"):
mysql_client.query("SELECT 1")
def test_remove_postgresql_port(cluster, zk):
with default_client(cluster, zk) as client:
pgsql_client = get_pgsql_client(cluster, port=9005)
cursor = pgsql_client.cursor()
cursor.execute("SELECT 1")
assert cursor.fetchall() == [(1,)]
with sync_loaded_config(client.query):
zk.delete("/clickhouse/ports/postgresql")
with pytest.raises(psycopg2.OperationalError, match="closed"):
cursor.execute("SELECT 1")
def test_remove_grpc_port(cluster, zk):
with default_client(cluster, zk) as client:
with get_grpc_channel(cluster, port=9100) as grpc_channel:
assert grpc_query(grpc_channel, "SELECT 1") == "1\n"
with sync_loaded_config(client.query):
zk.delete("/clickhouse/ports/grpc")
with pytest.raises(
grpc._channel._InactiveRpcError, match="StatusCode.UNAVAILABLE"
):
grpc_query(grpc_channel, "SELECT 1")
def test_change_listen_host(cluster, zk):
localhost_client = Client(
host="127.0.0.1", port=9000, command="/usr/bin/clickhouse"
)
localhost_client.command = [
"docker",
"exec",
"-i",
instance.docker_id,
] + localhost_client.command
try:
client = get_client(cluster, port=9000)
with sync_loaded_config(localhost_client.query):
zk.set("/clickhouse/listen_hosts", b"127.0.0.1")
with pytest.raises(QueryRuntimeException, match="Connection refused"):
client.query("SELECT 1")
assert localhost_client.query("SELECT 1") == "1\n"
finally:
with sync_loaded_config(localhost_client.query):
configure_from_zk(zk)
# This is a regression test for the case when the clickhouse-server was waiting
# for the connection that had been issued "SYSTEM RELOAD CONFIG" indefinitely.
#
# Configuration reload directly from the query,
# "directly from the query" means that reload was done from the query context
# over periodic config reload (that is done each 2 seconds).
def test_reload_via_client(cluster, zk):
exception = None
localhost_client = Client(
host="127.0.0.1", port=9000, command="/usr/bin/clickhouse"
)
localhost_client.command = [
"docker",
"exec",
"-i",
instance.docker_id,
] + localhost_client.command
# NOTE: reload via zookeeper is too fast, but 100 iterations was enough, even for debug build.
for i in range(0, 100):
try:
client = get_client(cluster, port=9000)
zk.set("/clickhouse/listen_hosts", b"127.0.0.1")
query_id = f"reload_config_{i}"
client.query("SYSTEM RELOAD CONFIG", query_id=query_id)
assert int(localhost_client.query("SELECT 1")) == 1
localhost_client.query("SYSTEM FLUSH LOGS")
MainConfigLoads = int(
localhost_client.query(
f"""
SELECT ProfileEvents['MainConfigLoads']
FROM system.query_log
WHERE query_id = '{query_id}' AND type = 'QueryFinish'
"""
)
)
assert MainConfigLoads == 1
logging.info("MainConfigLoads = %s (retry %s)", MainConfigLoads, i)
exception = None
break
except Exception as e:
logging.exception("Retry %s", i)
exception = e
finally:
while True:
try:
with sync_loaded_config(localhost_client.query):
configure_from_zk(zk)
break
except QueryRuntimeException:
logging.exception("The new socket is not binded yet")
time.sleep(0.1)
if exception:
raise exception
def test_change_http_handlers(cluster, zk):
with default_client(cluster, zk) as client:
curl_result = instance.exec_in_container(
["bash", "-c", "curl -s '127.0.0.1:8123/it_works'"]
)
assert "There is no handle /it_works" in curl_result
with sync_loaded_config(client.query):
zk.set(
"/clickhouse/http_handlers",
b"""
/it_works
GET
predefined_query_handler
SELECT 'It works.'
""",
)
curl_result = instance.exec_in_container(
["bash", "-c", "curl -s '127.0.0.1:8123/it_works'"]
)
assert curl_result == "It works.\n"