ClickHouse/tests/integration/helpers/cluster.py

2639 lines
123 KiB
Python
Raw Normal View History

import base64
import errno
2020-10-02 16:54:07 +00:00
import http.client
import logging
import os
2021-03-05 13:39:51 +00:00
import stat
import os.path as p
import pprint
import pwd
import re
import shutil
import socket
import subprocess
import time
import traceback
2020-10-02 16:54:07 +00:00
import urllib.parse
import shlex
2021-04-29 11:57:48 +00:00
import urllib3
2021-04-07 12:22:53 +00:00
from cassandra.policies import RoundRobinPolicy
import cassandra.cluster
import psycopg2
import pymongo
import pymysql
import requests
2021-02-24 11:46:58 +00:00
from confluent_kafka.avro.cached_schema_registry_client import \
CachedSchemaRegistryClient
2021-01-27 09:50:11 +00:00
from dict2xml import dict2xml
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
from minio import Minio
2021-07-12 08:32:20 +00:00
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
2021-10-19 10:19:43 +00:00
from helpers.test_tools import assert_eq_with_retry, exec_query_with_retry
2021-07-01 14:41:59 +00:00
from helpers import pytest_xdist_logging_to_separate_files
from helpers.client import QueryRuntimeException
2021-02-24 11:46:58 +00:00
import docker
from .client import Client
2021-06-13 14:02:08 +00:00
from .hdfs_api import HDFSApi
HELPERS_DIR = p.dirname(__file__)
CLICKHOUSE_ROOT_DIR = p.join(p.dirname(__file__), "../../..")
LOCAL_DOCKER_COMPOSE_DIR = p.join(CLICKHOUSE_ROOT_DIR, "docker/test/integration/runner/compose/")
2021-02-12 15:51:21 +00:00
DEFAULT_ENV_NAME = '.env'
SANITIZER_SIGN = "=================="
2021-02-19 12:58:11 +00:00
# to create docker-compose env file
2021-02-12 15:51:21 +00:00
def _create_env_file(path, variables):
2021-02-24 11:46:58 +00:00
logging.debug(f"Env {variables} stored in {path}")
2021-02-12 15:51:21 +00:00
with open(path, 'w') as f:
2020-10-02 16:54:07 +00:00
for var, value in list(variables.items()):
f.write("=".join([var, value]) + "\n")
2021-02-12 15:51:21 +00:00
return path
2021-06-04 10:14:32 +00:00
def run_and_check(args, env=None, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=300, nothrow=False, detach=False):
2021-06-01 09:53:36 +00:00
if detach:
2021-06-02 15:08:16 +00:00
subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=env, shell=shell)
2021-06-01 09:53:36 +00:00
return
2021-07-01 14:41:59 +00:00
logging.debug(f"Command:{args}")
2021-05-19 05:54:48 +00:00
res = subprocess.run(args, stdout=stdout, stderr=stderr, env=env, shell=shell, timeout=timeout)
2021-06-01 14:18:35 +00:00
out = res.stdout.decode('utf-8')
err = res.stderr.decode('utf-8')
2021-07-01 14:41:59 +00:00
# check_call(...) from subprocess does not print stderr, so we do it manually
for outline in out.splitlines():
logging.debug(f"Stdout:{outline}")
for errline in err.splitlines():
logging.debug(f"Stderr:{errline}")
2021-01-22 14:27:23 +00:00
if res.returncode != 0:
2021-07-01 14:41:59 +00:00
logging.debug(f"Exitcode:{res.returncode}")
if env:
logging.debug(f"Env:{env}")
2021-06-01 09:53:36 +00:00
if not nothrow:
raise Exception(f"Command {args} return non-zero code {res.returncode}: {res.stderr.decode('utf-8')}")
2021-07-01 14:41:59 +00:00
return out
2021-01-22 14:27:23 +00:00
2021-02-12 15:51:21 +00:00
# Based on https://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#2838309
2021-05-11 14:27:38 +00:00
def get_free_port():
2021-02-24 11:46:58 +00:00
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("",0))
s.listen(1)
port = s.getsockname()[1]
s.close()
return port
2021-01-22 14:27:23 +00:00
def retry_exception(num, delay, func, exception=Exception, *args, **kwargs):
"""
Retry if `func()` throws, `num` times.
:param func: func to run
:param num: number of retries
:throws StopIteration
"""
i = 0
while i <= num:
try:
func(*args, **kwargs)
time.sleep(delay)
except exception: # pylint: disable=broad-except
i += 1
continue
return
raise StopIteration('Function did not finished successfully')
2021-06-01 09:53:36 +00:00
def subprocess_check_call(args, detach=False, nothrow=False):
2018-09-28 14:53:20 +00:00
# Uncomment for debugging
2021-06-01 09:53:36 +00:00
#logging.info('run:' + ' '.join(args))
2021-06-01 14:18:35 +00:00
return run_and_check(args, detach=detach, nothrow=nothrow)
2018-09-28 14:53:20 +00:00
2021-07-27 15:54:13 +00:00
def get_odbc_bridge_path():
path = os.environ.get('CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH')
if path is None:
server_path = os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH')
if server_path is not None:
return os.path.join(os.path.dirname(server_path), 'clickhouse-odbc-bridge')
else:
return '/usr/bin/clickhouse-odbc-bridge'
return path
2021-03-11 17:48:47 +00:00
def get_library_bridge_path():
path = os.environ.get('CLICKHOUSE_TESTS_LIBRARY_BRIDGE_BIN_PATH')
if path is None:
server_path = os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH')
if server_path is not None:
return os.path.join(os.path.dirname(server_path), 'clickhouse-library-bridge')
else:
return '/usr/bin/clickhouse-library-bridge'
return path
def get_docker_compose_path():
compose_path = os.environ.get('DOCKER_COMPOSE_DIR')
if compose_path is not None:
return os.path.dirname(compose_path)
else:
if os.path.exists(os.path.dirname('/compose/')):
return os.path.dirname('/compose/') # default in docker runner container
else:
2021-02-24 11:46:58 +00:00
logging.debug(f"Fallback docker_compose_path to LOCAL_DOCKER_COMPOSE_DIR: {LOCAL_DOCKER_COMPOSE_DIR}")
return LOCAL_DOCKER_COMPOSE_DIR
2021-02-24 11:46:58 +00:00
def check_kafka_is_available(kafka_id, kafka_port):
p = subprocess.Popen(('docker',
'exec',
'-i',
kafka_id,
'/usr/bin/kafka-broker-api-versions',
'--bootstrap-server',
f'INSIDE://localhost:{kafka_port}'),
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p.communicate()
return p.returncode == 0
2021-04-30 09:18:12 +00:00
def check_rabbitmq_is_available(rabbitmq_id):
p = subprocess.Popen(('docker',
'exec',
'-i',
rabbitmq_id,
'rabbitmqctl',
'await_startup'),
stdout=subprocess.PIPE)
p.communicate()
return p.returncode == 0
def enable_consistent_hash_plugin(rabbitmq_id):
p = subprocess.Popen(('docker',
'exec',
'-i',
rabbitmq_id,
"rabbitmq-plugins", "enable", "rabbitmq_consistent_hash_exchange"),
stdout=subprocess.PIPE)
p.communicate()
return p.returncode == 0
2021-06-21 08:02:27 +00:00
def get_instances_dir():
2021-06-21 10:30:39 +00:00
if 'INTEGRATION_TESTS_RUN_ID' in os.environ and os.environ['INTEGRATION_TESTS_RUN_ID']:
2021-06-21 08:02:27 +00:00
return '_instances_' + shlex.quote(os.environ['INTEGRATION_TESTS_RUN_ID'])
else:
return '_instances'
class ClickHouseCluster:
"""ClickHouse cluster with several instances and (possibly) ZooKeeper.
Add instances with several calls to add_instance(), then start them with the start() call.
Directories for instances are created in the directory of base_path. After cluster is started,
these directories will contain logs, database files, docker-compose config, ClickHouse configs etc.
"""
2020-08-14 15:51:28 +00:00
def __init__(self, base_path, name=None, base_config_dir=None, server_bin_path=None, client_bin_path=None,
2021-05-21 13:29:43 +00:00
odbc_bridge_bin_path=None, library_bridge_bin_path=None, zookeeper_config_path=None, custom_dockerd_host=None,
zookeeper_keyfile=None, zookeeper_certfile=None):
2020-10-02 16:54:07 +00:00
for param in list(os.environ.keys()):
2021-02-18 21:21:50 +00:00
logging.debug("ENV %40s %s" % (param, os.environ[param]))
2021-07-01 14:41:59 +00:00
self.base_path = base_path
self.base_dir = p.dirname(base_path)
self.name = name if name is not None else ''
self.base_config_dir = base_config_dir or os.environ.get('CLICKHOUSE_TESTS_BASE_CONFIG_DIR',
'/etc/clickhouse-server/')
self.server_bin_path = p.realpath(
server_bin_path or os.environ.get('CLICKHOUSE_TESTS_SERVER_BIN_PATH', '/usr/bin/clickhouse'))
self.odbc_bridge_bin_path = p.realpath(odbc_bridge_bin_path or get_odbc_bridge_path())
2021-03-12 14:07:20 +00:00
self.library_bridge_bin_path = p.realpath(library_bridge_bin_path or get_library_bridge_path())
self.client_bin_path = p.realpath(
client_bin_path or os.environ.get('CLICKHOUSE_TESTS_CLIENT_BIN_PATH', '/usr/bin/clickhouse-client'))
self.zookeeper_config_path = p.join(self.base_dir, zookeeper_config_path) if zookeeper_config_path else p.join(
HELPERS_DIR, 'zookeeper_config.xml')
project_name = pwd.getpwuid(os.getuid()).pw_name + p.basename(self.base_dir) + self.name
# docker-compose removes everything non-alphanumeric from project names so we do it too.
self.project_name = re.sub(r'[^a-z0-9]', '', project_name.lower())
instances_dir_name = '_instances'
if self.name:
instances_dir_name += '_' + self.name
2021-06-21 15:53:47 +00:00
if 'INTEGRATION_TESTS_RUN_ID' in os.environ and os.environ['INTEGRATION_TESTS_RUN_ID']:
instances_dir_name += '_' + shlex.quote(os.environ['INTEGRATION_TESTS_RUN_ID'])
self.instances_dir = p.join(self.base_dir, instances_dir_name)
self.docker_logs_path = p.join(self.instances_dir, 'docker.log')
2021-02-16 07:10:01 +00:00
self.env_file = p.join(self.instances_dir, DEFAULT_ENV_NAME)
self.env_variables = {}
2021-10-19 10:19:43 +00:00
self.env_variables["TSAN_OPTIONS"] = "second_deadlock_stack=1"
2021-10-26 14:17:51 +00:00
self.env_variables["CLICKHOUSE_WATCHDOG_ENABLE"] = "0"
2021-02-17 15:40:01 +00:00
self.up_called = False
custom_dockerd_host = custom_dockerd_host or os.environ.get('CLICKHOUSE_TESTS_DOCKERD_HOST')
self.docker_api_version = os.environ.get("DOCKER_API_VERSION")
2020-09-03 13:03:26 +00:00
self.docker_base_tag = os.environ.get("DOCKER_BASE_TAG", "latest")
self.base_cmd = ['docker-compose']
if custom_dockerd_host:
self.base_cmd += ['--host', custom_dockerd_host]
2021-02-16 07:10:01 +00:00
self.base_cmd += ['--env-file', self.env_file]
self.base_cmd += ['--project-name', self.project_name]
2017-08-30 16:25:34 +00:00
self.base_zookeeper_cmd = None
2018-05-14 11:10:07 +00:00
self.base_mysql_cmd = []
self.base_kafka_cmd = []
self.base_kerberized_kafka_cmd = []
self.base_rabbitmq_cmd = []
self.base_cassandra_cmd = []
2021-06-07 12:56:29 +00:00
self.base_jdbc_bridge_cmd = []
2021-02-18 21:21:50 +00:00
self.base_redis_cmd = []
self.pre_zookeeper_commands = []
self.instances = {}
self.with_zookeeper = False
2021-05-21 13:29:43 +00:00
self.with_zookeeper_secure = False
2021-04-13 14:55:31 +00:00
self.with_mysql_client = False
2018-05-14 11:10:07 +00:00
self.with_mysql = False
2021-02-16 07:10:01 +00:00
self.with_mysql8 = False
self.with_mysql_cluster = False
self.with_postgres = False
self.with_postgres_cluster = False
self.with_kafka = False
self.with_kerberized_kafka = False
self.with_rabbitmq = False
2018-08-22 15:42:27 +00:00
self.with_odbc_drivers = False
self.with_hdfs = False
2020-09-10 10:02:46 +00:00
self.with_kerberized_hdfs = False
2019-02-25 10:45:22 +00:00
self.with_mongo = False
2021-07-27 15:54:13 +00:00
self.with_mongo_secure = False
self.with_net_trics = False
2019-03-21 18:10:55 +00:00
self.with_redis = False
self.with_cassandra = False
2021-06-07 12:56:29 +00:00
self.with_jdbc_bridge = False
2021-06-13 12:56:22 +00:00
self.with_nginx = False
self.with_minio = False
2021-02-19 12:58:11 +00:00
self.minio_dir = os.path.join(self.instances_dir, "minio")
2021-06-13 12:56:22 +00:00
self.minio_certs_dir = None # source for certificates
self.minio_host = "minio1"
self.minio_ip = None
self.minio_bucket = "root"
self.minio_bucket_2 = "root2"
self.minio_port = 9001
self.minio_client = None # type: Minio
2020-07-10 19:42:18 +00:00
self.minio_redirect_host = "proxy1"
2021-04-29 11:57:48 +00:00
self.minio_redirect_ip = None
2020-07-10 19:42:18 +00:00
self.minio_redirect_port = 8080
2021-02-18 21:21:50 +00:00
# available when with_hdfs == True
self.hdfs_host = "hdfs1"
2021-05-27 04:24:16 +00:00
self.hdfs_ip = None
2021-06-09 09:23:02 +00:00
self.hdfs_name_port = 50070
self.hdfs_data_port = 50075
2021-02-18 21:21:50 +00:00
self.hdfs_dir = p.abspath(p.join(self.instances_dir, "hdfs"))
self.hdfs_logs_dir = os.path.join(self.hdfs_dir, "logs")
2021-06-09 09:23:02 +00:00
self.hdfs_api = None # also for kerberized hdfs
2021-02-18 21:21:50 +00:00
2021-02-19 14:42:43 +00:00
# available when with_kerberized_hdfs == True
self.hdfs_kerberized_host = "kerberizedhdfs1"
2021-06-09 09:23:02 +00:00
self.hdfs_kerberized_ip = None
self.hdfs_kerberized_name_port = 50070
self.hdfs_kerberized_data_port = 1006
2021-02-19 14:42:43 +00:00
self.hdfs_kerberized_dir = p.abspath(p.join(self.instances_dir, "kerberized_hdfs"))
self.hdfs_kerberized_logs_dir = os.path.join(self.hdfs_kerberized_dir, "logs")
2020-02-03 00:02:19 +00:00
# available when with_kafka == True
2021-02-18 12:57:45 +00:00
self.kafka_host = "kafka1"
2021-05-11 14:27:38 +00:00
self.kafka_port = get_free_port()
2021-02-18 12:57:45 +00:00
self.kafka_docker_id = None
2020-02-03 00:02:19 +00:00
self.schema_registry_host = "schema-registry"
2021-05-11 14:27:38 +00:00
self.schema_registry_port = get_free_port()
2021-02-18 12:57:45 +00:00
self.kafka_docker_id = self.get_instance_docker_id(self.kafka_host)
# available when with_kerberozed_kafka == True
self.kerberized_kafka_host = "kerberized_kafka1"
2021-05-11 14:27:38 +00:00
self.kerberized_kafka_port = get_free_port()
2021-02-18 21:21:50 +00:00
self.kerberized_kafka_docker_id = self.get_instance_docker_id(self.kerberized_kafka_host)
2020-02-03 00:02:19 +00:00
2021-02-12 15:51:21 +00:00
# available when with_mongo == True
self.mongo_host = "mongo1"
2021-05-11 14:27:38 +00:00
self.mongo_port = get_free_port()
2021-02-12 15:51:21 +00:00
2021-02-15 09:35:45 +00:00
# available when with_cassandra == True
self.cassandra_host = "cassandra1"
2021-04-07 12:22:53 +00:00
self.cassandra_port = 9042
self.cassandra_ip = None
self.cassandra_id = self.get_instance_docker_id(self.cassandra_host)
2021-02-15 09:35:45 +00:00
2021-02-16 14:16:15 +00:00
# available when with_rabbitmq == True
self.rabbitmq_host = "rabbitmq1"
2021-04-30 09:18:12 +00:00
self.rabbitmq_ip = None
self.rabbitmq_port = 5672
2021-05-17 11:16:16 +00:00
self.rabbitmq_dir = p.abspath(p.join(self.instances_dir, "rabbitmq"))
self.rabbitmq_logs_dir = os.path.join(self.rabbitmq_dir, "logs")
2021-06-13 14:02:08 +00:00
# available when with_nginx == True
2021-06-13 12:56:22 +00:00
self.nginx_host = "nginx"
self.nginx_ip = None
self.nginx_port = 80
self.nginx_id = self.get_instance_docker_id(self.nginx_host)
2021-02-16 14:16:15 +00:00
# available when with_redis == True
self.redis_host = "redis1"
2021-05-11 14:27:38 +00:00
self.redis_port = get_free_port()
2021-02-16 14:16:15 +00:00
2021-02-17 15:40:01 +00:00
# available when with_postgres == True
self.postgres_host = "postgres1"
self.postgres_ip = None
2021-07-12 08:32:20 +00:00
self.postgres_conn = None
self.postgres2_host = "postgres2"
self.postgres2_ip = None
2021-07-12 08:32:20 +00:00
self.postgres2_conn = None
self.postgres3_host = "postgres3"
self.postgres3_ip = None
2021-07-12 08:32:20 +00:00
self.postgres3_conn = None
self.postgres4_host = "postgres4"
self.postgres4_ip = None
2021-07-12 08:32:20 +00:00
self.postgres4_conn = None
self.postgres_port = 5432
self.postgres_dir = p.abspath(p.join(self.instances_dir, "postgres"))
self.postgres_logs_dir = os.path.join(self.postgres_dir, "postgres1")
self.postgres2_logs_dir = os.path.join(self.postgres_dir, "postgres2")
self.postgres3_logs_dir = os.path.join(self.postgres_dir, "postgres3")
self.postgres4_logs_dir = os.path.join(self.postgres_dir, "postgres4")
2021-02-17 15:40:01 +00:00
2021-04-13 14:55:31 +00:00
# available when with_mysql_client == True
self.mysql_client_host = "mysql_client"
self.mysql_client_container = None
2021-06-13 12:56:22 +00:00
2021-02-15 09:35:45 +00:00
# available when with_mysql == True
self.mysql_host = "mysql57"
2021-03-05 13:39:51 +00:00
self.mysql_port = 3306
self.mysql_ip = None
self.mysql_dir = p.abspath(p.join(self.instances_dir, "mysql"))
self.mysql_logs_dir = os.path.join(self.mysql_dir, "logs")
2021-02-15 09:35:45 +00:00
# available when with_mysql_cluster == True
self.mysql2_host = "mysql2"
self.mysql3_host = "mysql3"
self.mysql4_host = "mysql4"
self.mysql2_ip = None
self.mysql3_ip = None
self.mysql4_ip = None
self.mysql_cluster_dir = p.abspath(p.join(self.instances_dir, "mysql"))
self.mysql_cluster_logs_dir = os.path.join(self.mysql_dir, "logs")
2021-02-16 07:10:01 +00:00
# available when with_mysql8 == True
self.mysql8_host = "mysql80"
2021-03-09 07:32:10 +00:00
self.mysql8_port = 3306
self.mysql8_ip = None
2021-03-05 13:39:51 +00:00
self.mysql8_dir = p.abspath(p.join(self.instances_dir, "mysql8"))
self.mysql8_logs_dir = os.path.join(self.mysql8_dir, "logs")
2021-02-16 07:10:01 +00:00
2021-05-21 13:29:43 +00:00
# available when with_zookeper_secure == True
self.zookeeper_secure_port = 2281
self.zookeeper_keyfile = zookeeper_keyfile
self.zookeeper_certfile = zookeeper_certfile
# available when with_zookeper == True
2021-05-16 14:18:21 +00:00
self.use_keeper = True
2021-05-21 13:29:43 +00:00
self.zookeeper_port = 2181
self.keeper_instance_dir_prefix = p.join(p.abspath(self.instances_dir), "keeper") # if use_keeper = True
self.zookeeper_instance_dir_prefix = p.join(self.instances_dir, "zk")
self.zookeeper_dirs_to_create = []
2021-07-27 07:33:49 +00:00
# available when with_jdbc_bridge == True
self.jdbc_bridge_host = "bridge1"
self.jdbc_bridge_ip = None
self.jdbc_bridge_port = 9019
self.jdbc_driver_dir = p.abspath(p.join(self.instances_dir, "jdbc_driver"))
self.jdbc_driver_logs_dir = os.path.join(self.jdbc_driver_dir, "logs")
self.docker_client = None
self.is_up = False
2021-05-14 12:30:49 +00:00
self.env = os.environ.copy()
2021-02-24 11:46:58 +00:00
logging.debug(f"CLUSTER INIT base_config_dir:{self.base_config_dir}")
2021-04-27 09:07:21 +00:00
def cleanup(self):
2021-10-26 14:17:51 +00:00
if os.environ and 'DISABLE_CLEANUP' in os.environ and os.environ['DISABLE_CLEANUP'] == "1":
logging.warning("Cleanup is disabled")
return
2021-04-27 09:07:21 +00:00
# Just in case kill unstopped containers from previous launch
try:
# docker-compose names containers using the following formula:
# container_name = project_name + '_' + instance_name + '_1'
# We need to have "^/" and "$" in the "--filter name" option below to filter by exact name of the container, see
# https://stackoverflow.com/questions/48767760/how-to-make-docker-container-ls-f-name-filter-by-exact-name
filter_name = f'^/{self.project_name}_.*_1$'
if int(run_and_check(f'docker container list --all --filter name={filter_name} | wc -l', shell=True)) > 1:
logging.debug(f"Trying to kill unstopped containers for project {self.project_name}:")
2021-11-11 07:41:48 +00:00
unstopped_containers = run_and_check(f'docker container list --all --filter name={filter_name}', shell=True).splitlines()
logging.debug(f"Unstopped containers {unstopped_containers}")
for id in unstopped_containers:
run_and_check(f'docker kill {id}', shell=True, nothrow=True)
run_and_check(f'docker rm {id}', shell=True, nothrow=True)
2021-11-11 07:41:48 +00:00
left_ids = run_and_check(f'docker container list --all --filter name={filter_name}', shell=True)
logging.debug(f"Unstopped containers killed. Left {left_ids}")
else:
logging.debug(f"No running containers for project: {self.project_name}")
2021-04-27 09:07:21 +00:00
except:
pass
# # Just in case remove unused networks
# try:
# logging.debug("Trying to prune unused networks...")
2021-06-04 10:14:32 +00:00
# run_and_check(['docker', 'network', 'prune', '-f'])
2021-04-27 09:07:21 +00:00
# logging.debug("Networks pruned")
# except:
# pass
# Remove unused images
2021-05-21 15:48:14 +00:00
# try:
# logging.debug("Trying to prune unused images...")
2021-04-27 09:07:21 +00:00
2021-06-04 10:14:32 +00:00
# run_and_check(['docker', 'image', 'prune', '-f'])
2021-05-21 15:48:14 +00:00
# logging.debug("Images pruned")
# except:
# pass
2021-04-27 09:07:21 +00:00
# Remove unused volumes
try:
logging.debug("Trying to prune unused volumes...")
2021-06-09 13:53:16 +00:00
result = run_and_check(['docker volume ls | wc -l'], shell=True)
if int(result>0):
run_and_check(['docker', 'volume', 'prune', '-f'])
logging.debug(f"Volumes pruned: {result}")
2021-04-27 09:07:21 +00:00
except:
pass
2021-04-13 14:55:31 +00:00
def get_docker_handle(self, docker_id):
exception = None
for i in range(5):
try:
return self.docker_client.containers.get(docker_id)
except Exception as ex:
print("Got exception getting docker handle", str(ex))
time.sleep(i * 2)
exception = ex
raise exception
2021-04-13 14:55:31 +00:00
2018-09-07 11:51:51 +00:00
def get_client_cmd(self):
cmd = self.client_bin_path
if p.basename(cmd) == 'clickhouse':
cmd += " client"
return cmd
def copy_file_from_container_to_container(self, src_node, src_path, dst_node, dst_path):
fname = os.path.basename(src_path)
run_and_check([f"docker cp {src_node.docker_id}:{src_path} {self.instances_dir}"], shell=True)
run_and_check([f"docker cp {self.instances_dir}/{fname} {dst_node.docker_id}:{dst_path}"], shell=True)
2021-05-21 13:29:43 +00:00
def setup_zookeeper_secure_cmd(self, instance, env_variables, docker_compose_yml_dir):
logging.debug('Setup ZooKeeper Secure')
zookeeper_docker_compose_path = p.join(docker_compose_yml_dir, 'docker_compose_zookeeper_secure.yml')
env_variables['ZOO_SECURE_CLIENT_PORT'] = str(self.zookeeper_secure_port)
env_variables['ZK_FS'] = 'bind'
for i in range(1, 4):
zk_data_path = os.path.join(self.zookeeper_instance_dir_prefix + str(i), "data")
zk_log_path = os.path.join(self.zookeeper_instance_dir_prefix + str(i), "log")
env_variables['ZK_DATA' + str(i)] = zk_data_path
env_variables['ZK_DATA_LOG' + str(i)] = zk_log_path
self.zookeeper_dirs_to_create += [zk_data_path, zk_log_path]
logging.debug(f"DEBUG ZK: {self.zookeeper_dirs_to_create}")
self.with_zookeeper_secure = True
self.base_cmd.extend(['--file', zookeeper_docker_compose_path])
self.base_zookeeper_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', zookeeper_docker_compose_path]
return self.base_zookeeper_cmd
def setup_zookeeper_cmd(self, instance, env_variables, docker_compose_yml_dir):
logging.debug('Setup ZooKeeper')
zookeeper_docker_compose_path = p.join(docker_compose_yml_dir, 'docker_compose_zookeeper.yml')
env_variables['ZK_FS'] = 'bind'
for i in range(1, 4):
zk_data_path = os.path.join(self.zookeeper_instance_dir_prefix + str(i), "data")
zk_log_path = os.path.join(self.zookeeper_instance_dir_prefix + str(i), "log")
env_variables['ZK_DATA' + str(i)] = zk_data_path
env_variables['ZK_DATA_LOG' + str(i)] = zk_log_path
self.zookeeper_dirs_to_create += [zk_data_path, zk_log_path]
logging.debug(f"DEBUG ZK: {self.zookeeper_dirs_to_create}")
self.with_zookeeper = True
self.base_cmd.extend(['--file', zookeeper_docker_compose_path])
self.base_zookeeper_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', zookeeper_docker_compose_path]
return self.base_zookeeper_cmd
def setup_keeper_cmd(self, instance, env_variables, docker_compose_yml_dir):
logging.debug('Setup Keeper')
keeper_docker_compose_path = p.join(docker_compose_yml_dir, 'docker_compose_keeper.yml')
binary_path = self.server_bin_path
if binary_path.endswith('-server'):
binary_path = binary_path[:-len('-server')]
env_variables['keeper_binary'] = binary_path
2021-09-08 10:03:54 +00:00
env_variables['image'] = "clickhouse/integration-test:" + self.docker_base_tag
env_variables['user'] = str(os.getuid())
env_variables['keeper_fs'] = 'bind'
for i in range(1, 4):
keeper_instance_dir = self.keeper_instance_dir_prefix + f"{i}"
logs_dir = os.path.join(keeper_instance_dir, "log")
configs_dir = os.path.join(keeper_instance_dir, "config")
coordination_dir = os.path.join(keeper_instance_dir, "coordination")
env_variables[f'keeper_logs_dir{i}'] = logs_dir
env_variables[f'keeper_config_dir{i}'] = configs_dir
env_variables[f'keeper_db_dir{i}'] = coordination_dir
self.zookeeper_dirs_to_create += [logs_dir, configs_dir, coordination_dir]
logging.debug(f"DEBUG KEEPER: {self.zookeeper_dirs_to_create}")
self.with_zookeeper = True
self.base_cmd.extend(['--file', keeper_docker_compose_path])
self.base_zookeeper_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', keeper_docker_compose_path]
return self.base_zookeeper_cmd
2021-04-13 14:55:31 +00:00
def setup_mysql_client_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_mysql_client = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_mysql_client.yml')])
self.base_mysql_client_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_mysql_client.yml')]
return self.base_mysql_client_cmd
2021-02-15 09:35:45 +00:00
def setup_mysql_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_mysql = True
env_variables['MYSQL_HOST'] = self.mysql_host
2021-03-05 13:39:51 +00:00
env_variables['MYSQL_PORT'] = str(self.mysql_port)
env_variables['MYSQL_ROOT_HOST'] = '%'
env_variables['MYSQL_LOGS'] = self.mysql_logs_dir
env_variables['MYSQL_LOGS_FS'] = "bind"
2021-02-15 09:35:45 +00:00
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_mysql.yml')])
self.base_mysql_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_mysql.yml')]
return self.base_mysql_cmd
2021-02-16 07:10:01 +00:00
def setup_mysql8_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_mysql8 = True
env_variables['MYSQL8_HOST'] = self.mysql8_host
2021-03-09 07:32:10 +00:00
env_variables['MYSQL8_PORT'] = str(self.mysql8_port)
env_variables['MYSQL8_ROOT_HOST'] = '%'
2021-03-05 13:39:51 +00:00
env_variables['MYSQL8_LOGS'] = self.mysql8_logs_dir
env_variables['MYSQL8_LOGS_FS'] = "bind"
2021-02-16 07:10:01 +00:00
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_mysql_8_0.yml')])
self.base_mysql8_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_mysql_8_0.yml')]
return self.base_mysql8_cmd
2021-06-13 12:56:22 +00:00
def setup_mysql_cluster_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_mysql_cluster = True
env_variables['MYSQL_CLUSTER_PORT'] = str(self.mysql_port)
env_variables['MYSQL_CLUSTER_ROOT_HOST'] = '%'
env_variables['MYSQL_CLUSTER_LOGS'] = self.mysql_cluster_logs_dir
env_variables['MYSQL_CLUSTER_LOGS_FS'] = "bind"
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_mysql_cluster.yml')])
self.base_mysql_cluster_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_mysql_cluster.yml')]
return self.base_mysql_cluster_cmd
def setup_postgres_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_postgres.yml')])
env_variables['POSTGRES_PORT'] = str(self.postgres_port)
2021-03-19 16:44:08 +00:00
env_variables['POSTGRES_DIR'] = self.postgres_logs_dir
env_variables['POSTGRES_LOGS_FS'] = "bind"
self.with_postgres = True
self.base_postgres_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_postgres.yml')]
return self.base_postgres_cmd
2021-06-13 12:56:22 +00:00
def setup_postgres_cluster_cmd(self, instance, env_variables, docker_compose_yml_dir):
2021-04-08 14:43:57 +00:00
self.with_postgres_cluster = True
env_variables['POSTGRES_PORT'] = str(self.postgres_port)
env_variables['POSTGRES2_DIR'] = self.postgres2_logs_dir
env_variables['POSTGRES3_DIR'] = self.postgres3_logs_dir
env_variables['POSTGRES4_DIR'] = self.postgres4_logs_dir
env_variables['POSTGRES_LOGS_FS'] = "bind"
2021-04-13 16:08:29 +00:00
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_postgres_cluster.yml')])
2021-04-08 14:43:57 +00:00
self.base_postgres_cluster_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_postgres_cluster.yml')]
2021-02-18 21:21:50 +00:00
def setup_hdfs_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_hdfs = True
env_variables['HDFS_HOST'] = self.hdfs_host
2021-06-09 09:23:02 +00:00
env_variables['HDFS_NAME_PORT'] = str(self.hdfs_name_port)
env_variables['HDFS_DATA_PORT'] = str(self.hdfs_data_port)
2021-02-18 21:21:50 +00:00
env_variables['HDFS_LOGS'] = self.hdfs_logs_dir
env_variables['HDFS_FS'] = "bind"
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_hdfs.yml')])
self.base_hdfs_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_hdfs.yml')]
2021-06-11 12:00:40 +00:00
logging.debug("HDFS BASE CMD:{self.base_hdfs_cmd)}")
2021-02-18 21:21:50 +00:00
return self.base_hdfs_cmd
2021-02-19 14:42:43 +00:00
def setup_kerberized_hdfs_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_kerberized_hdfs = True
env_variables['KERBERIZED_HDFS_HOST'] = self.hdfs_kerberized_host
2021-06-09 09:23:02 +00:00
env_variables['KERBERIZED_HDFS_NAME_PORT'] = str(self.hdfs_kerberized_name_port)
env_variables['KERBERIZED_HDFS_DATA_PORT'] = str(self.hdfs_kerberized_data_port)
2021-02-19 14:42:43 +00:00
env_variables['KERBERIZED_HDFS_LOGS'] = self.hdfs_kerberized_logs_dir
env_variables['KERBERIZED_HDFS_FS'] = "bind"
env_variables['KERBERIZED_HDFS_DIR'] = instance.path + '/'
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')])
self.base_kerberized_hdfs_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')]
return self.base_kerberized_hdfs_cmd
2021-02-18 12:57:45 +00:00
def setup_kafka_cmd(self, instance, env_variables, docker_compose_yml_dir):
2021-02-24 11:46:58 +00:00
self.with_kafka = True
2021-02-18 12:57:45 +00:00
env_variables['KAFKA_HOST'] = self.kafka_host
env_variables['KAFKA_EXTERNAL_PORT'] = str(self.kafka_port)
env_variables['SCHEMA_REGISTRY_EXTERNAL_PORT'] = str(self.schema_registry_port)
env_variables['SCHEMA_REGISTRY_INTERNAL_PORT'] = "8081"
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_kafka.yml')])
2021-02-18 21:21:50 +00:00
self.base_kafka_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
2021-02-18 12:57:45 +00:00
'--file', p.join(docker_compose_yml_dir, 'docker_compose_kafka.yml')]
2021-02-18 21:21:50 +00:00
return self.base_kafka_cmd
2021-02-18 12:57:45 +00:00
def setup_kerberized_kafka_cmd(self, instance, env_variables, docker_compose_yml_dir):
2021-02-24 11:46:58 +00:00
self.with_kerberized_kafka = True
2021-02-18 12:57:45 +00:00
env_variables['KERBERIZED_KAFKA_DIR'] = instance.path + '/'
env_variables['KERBERIZED_KAFKA_HOST'] = self.kerberized_kafka_host
env_variables['KERBERIZED_KAFKA_EXTERNAL_PORT'] = str(self.kerberized_kafka_port)
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_kafka.yml')])
2021-02-18 21:21:50 +00:00
self.base_kerberized_kafka_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
2021-02-18 12:57:45 +00:00
'--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_kafka.yml')]
2021-02-18 21:21:50 +00:00
return self.base_kerberized_kafka_cmd
2021-02-18 12:57:45 +00:00
2021-02-16 14:16:15 +00:00
def setup_redis_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_redis = True
env_variables['REDIS_HOST'] = self.redis_host
env_variables['REDIS_EXTERNAL_PORT'] = str(self.redis_port)
env_variables['REDIS_INTERNAL_PORT'] = "6379"
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_redis.yml')])
self.base_redis_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_redis.yml')]
return self.base_redis_cmd
def setup_rabbitmq_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_rabbitmq = True
env_variables['RABBITMQ_HOST'] = self.rabbitmq_host
2021-04-30 09:18:12 +00:00
env_variables['RABBITMQ_PORT'] = str(self.rabbitmq_port)
2021-05-17 11:16:16 +00:00
env_variables['RABBITMQ_LOGS'] = self.rabbitmq_logs_dir
env_variables['RABBITMQ_LOGS_FS'] = "bind"
2021-02-16 14:16:15 +00:00
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_rabbitmq.yml')])
self.base_rabbitmq_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_rabbitmq.yml')]
return self.base_rabbitmq_cmd
2021-07-27 15:54:13 +00:00
def setup_mongo_secure_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_mongo = self.with_mongo_secure = True
env_variables['MONGO_HOST'] = self.mongo_host
env_variables['MONGO_EXTERNAL_PORT'] = str(self.mongo_port)
env_variables['MONGO_INTERNAL_PORT'] = "27017"
env_variables['MONGO_CONFIG_PATH'] = HELPERS_DIR
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_mongo_secure.yml')])
self.base_mongo_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_mongo_secure.yml')]
return self.base_mongo_cmd
2021-02-18 21:21:50 +00:00
def setup_mongo_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_mongo = True
env_variables['MONGO_HOST'] = self.mongo_host
env_variables['MONGO_EXTERNAL_PORT'] = str(self.mongo_port)
env_variables['MONGO_INTERNAL_PORT'] = "27017"
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_mongo.yml')])
self.base_mongo_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_mongo.yml')]
return self.base_mongo_cmd
2021-02-19 12:58:11 +00:00
def setup_minio_cmd(self, instance, env_variables, docker_compose_yml_dir):
2021-06-13 12:56:22 +00:00
self.with_minio = True
2021-02-19 12:58:11 +00:00
cert_d = p.join(self.minio_dir, "certs")
env_variables['MINIO_CERTS_DIR'] = cert_d
env_variables['MINIO_PORT'] = str(self.minio_port)
2021-02-19 12:58:11 +00:00
env_variables['SSL_CERT_FILE'] = p.join(self.base_dir, cert_d, 'public.crt')
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_minio.yml')])
self.base_minio_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_minio.yml')]
return self.base_minio_cmd
def setup_cassandra_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_cassandra = True
env_variables['CASSANDRA_PORT'] = str(self.cassandra_port)
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_cassandra.yml')])
self.base_cassandra_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_cassandra.yml')]
return self.base_cassandra_cmd
2021-06-07 12:56:29 +00:00
def setup_jdbc_bridge_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_jdbc_bridge = True
2021-07-27 07:33:49 +00:00
env_variables['JDBC_DRIVER_LOGS'] = self.jdbc_driver_logs_dir
env_variables['JDBC_DRIVER_FS'] = "bind"
2021-06-07 12:56:29 +00:00
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_jdbc_bridge.yml')])
self.base_jdbc_bridge_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_jdbc_bridge.yml')]
return self.base_jdbc_bridge_cmd
2021-06-13 12:56:22 +00:00
def setup_nginx_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_nginx = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_nginx.yml')])
self.base_nginx_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_nginx.yml')]
return self.base_nginx_cmd
def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries=None,
2021-05-21 13:29:43 +00:00
macros=None, with_zookeeper=False, with_zookeeper_secure=False,
2021-06-13 12:56:22 +00:00
with_mysql_client=False, with_mysql=False, with_mysql8=False, with_mysql_cluster=False,
with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False, clickhouse_path_dir=None,
2021-07-27 15:54:13 +00:00
with_odbc_drivers=False, with_postgres=False, with_postgres_cluster=False, with_hdfs=False,
with_kerberized_hdfs=False, with_mongo=False, with_mongo_secure=False, with_nginx=False,
2021-06-07 12:56:29 +00:00
with_redis=False, with_minio=False, with_cassandra=False, with_jdbc_bridge=False,
2021-09-08 10:03:54 +00:00
hostname=None, env_variables=None, image="clickhouse/integration-test", tag=None,
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, external_dirs=None, tmpfs=None,
zookeeper_docker_compose_path=None, minio_certs_dir=None, use_keeper=True,
main_config_name="config.xml", users_config_name="users.xml", copy_common_configs=True, config_root_name="clickhouse"):
2021-05-21 18:56:22 +00:00
"""Add an instance to the cluster.
name - the name of the instance directory and the value of the 'instance' macro in ClickHouse.
base_config_dir - a directory with config.xml and users.xml files which will be copied to /etc/clickhouse-server/ directory
main_configs - a list of config files that will be added to config.d/ directory
user_configs - a list of config files that will be added to users.d/ directory
with_zookeeper - if True, add ZooKeeper configuration to configs and ZooKeeper instances to the cluster.
2021-05-21 13:29:43 +00:00
with_zookeeper_secure - if True, add ZooKeeper Secure configuration to configs and ZooKeeper instances to the cluster.
"""
if self.is_up:
raise Exception("Can\'t add instance %s: cluster is already up!" % name)
if name in self.instances:
raise Exception("Can\'t add instance `%s': there is already an instance with the same name!" % name)
2020-09-01 06:38:23 +00:00
if tag is None:
tag = self.docker_base_tag
if not env_variables:
env_variables = {}
2021-05-16 14:18:21 +00:00
self.use_keeper = use_keeper
# Code coverage files will be placed in database directory
# (affect only WITH_COVERAGE=1 build)
env_variables['LLVM_PROFILE_FILE'] = '/var/lib/clickhouse/server_%h_%p_%m.profraw'
2020-09-01 06:38:23 +00:00
instance = ClickHouseInstance(
2020-09-02 12:28:47 +00:00
cluster=self,
base_path=self.base_dir,
name=name,
base_config_dir=base_config_dir if base_config_dir else self.base_config_dir,
custom_main_configs=main_configs or [],
custom_user_configs=user_configs or [],
custom_dictionaries=dictionaries or [],
macros=macros or {},
with_zookeeper=with_zookeeper,
zookeeper_config_path=self.zookeeper_config_path,
2021-04-13 14:55:31 +00:00
with_mysql_client=with_mysql_client,
2020-09-02 12:28:47 +00:00
with_mysql=with_mysql,
2021-02-16 07:10:01 +00:00
with_mysql8=with_mysql8,
with_mysql_cluster=with_mysql_cluster,
2020-09-02 12:28:47 +00:00
with_kafka=with_kafka,
with_kerberized_kafka=with_kerberized_kafka,
2020-09-02 12:28:47 +00:00
with_rabbitmq=with_rabbitmq,
2021-06-13 12:56:22 +00:00
with_nginx=with_nginx,
2020-09-10 10:02:46 +00:00
with_kerberized_hdfs=with_kerberized_hdfs,
2021-07-27 15:54:13 +00:00
with_mongo=with_mongo or with_mongo_secure,
2020-09-02 12:28:47 +00:00
with_redis=with_redis,
with_minio=with_minio,
with_cassandra=with_cassandra,
2021-06-07 12:56:29 +00:00
with_jdbc_bridge=with_jdbc_bridge,
2020-09-02 12:28:47 +00:00
server_bin_path=self.server_bin_path,
odbc_bridge_bin_path=self.odbc_bridge_bin_path,
2021-03-11 17:48:47 +00:00
library_bridge_bin_path=self.library_bridge_bin_path,
2020-09-02 12:28:47 +00:00
clickhouse_path_dir=clickhouse_path_dir,
with_odbc_drivers=with_odbc_drivers,
2021-04-13 14:55:31 +00:00
with_postgres=with_postgres,
with_postgres_cluster=with_postgres_cluster,
2020-09-02 12:28:47 +00:00
hostname=hostname,
env_variables=env_variables,
2020-09-02 12:28:47 +00:00
image=image,
tag=tag,
stay_alive=stay_alive,
ipv4_address=ipv4_address,
ipv6_address=ipv6_address,
2020-09-02 12:28:47 +00:00
with_installed_binary=with_installed_binary,
2021-05-21 18:56:22 +00:00
main_config_name=main_config_name,
users_config_name=users_config_name,
copy_common_configs=copy_common_configs,
external_dirs=external_dirs,
2021-09-22 15:00:08 +00:00
tmpfs=tmpfs or [],
config_root_name=config_root_name)
docker_compose_yml_dir = get_docker_compose_path()
self.instances[name] = instance
2019-06-04 20:59:31 +00:00
if ipv4_address is not None or ipv6_address is not None:
self.with_net_trics = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_net.yml')])
2019-06-04 20:59:31 +00:00
self.base_cmd.extend(['--file', instance.docker_compose_path])
2019-06-04 20:59:31 +00:00
cmds = []
2021-05-21 13:29:43 +00:00
if with_zookeeper_secure and not self.with_zookeeper_secure:
cmds.append(self.setup_zookeeper_secure_cmd(instance, env_variables, docker_compose_yml_dir))
if with_zookeeper and not self.with_zookeeper:
if self.use_keeper:
cmds.append(self.setup_keeper_cmd(instance, env_variables, docker_compose_yml_dir))
else:
cmds.append(self.setup_zookeeper_cmd(instance, env_variables, docker_compose_yml_dir))
2021-04-13 14:55:31 +00:00
if with_mysql_client and not self.with_mysql_client:
cmds.append(self.setup_mysql_client_cmd(instance, env_variables, docker_compose_yml_dir))
2018-05-14 11:10:07 +00:00
if with_mysql and not self.with_mysql:
2021-02-15 09:35:45 +00:00
cmds.append(self.setup_mysql_cmd(instance, env_variables, docker_compose_yml_dir))
2021-02-16 07:10:01 +00:00
if with_mysql8 and not self.with_mysql8:
cmds.append(self.setup_mysql8_cmd(instance, env_variables, docker_compose_yml_dir))
if with_mysql_cluster and not self.with_mysql_cluster:
cmds.append(self.setup_mysql_cluster_cmd(instance, env_variables, docker_compose_yml_dir))
if with_postgres and not self.with_postgres:
2021-02-17 15:40:01 +00:00
cmds.append(self.setup_postgres_cmd(instance, env_variables, docker_compose_yml_dir))
if with_postgres_cluster and not self.with_postgres_cluster:
2021-04-08 14:43:57 +00:00
cmds.append(self.setup_postgres_cluster_cmd(instance, env_variables, docker_compose_yml_dir))
2018-08-22 15:42:27 +00:00
if with_odbc_drivers and not self.with_odbc_drivers:
self.with_odbc_drivers = True
if not self.with_mysql:
2021-02-15 09:35:45 +00:00
cmds.append(self.setup_mysql_cmd(instance, env_variables, docker_compose_yml_dir))
if not self.with_postgres:
2021-02-17 15:40:01 +00:00
cmds.append(self.setup_postgres_cmd(instance, env_variables, docker_compose_yml_dir))
if with_kafka and not self.with_kafka:
2021-02-18 12:57:45 +00:00
cmds.append(self.setup_kafka_cmd(instance, env_variables, docker_compose_yml_dir))
if with_kerberized_kafka and not self.with_kerberized_kafka:
2021-02-18 12:57:45 +00:00
cmds.append(self.setup_kerberized_kafka_cmd(instance, env_variables, docker_compose_yml_dir))
if with_rabbitmq and not self.with_rabbitmq:
2021-02-16 14:16:15 +00:00
cmds.append(self.setup_rabbitmq_cmd(instance, env_variables, docker_compose_yml_dir))
2021-06-13 12:56:22 +00:00
if with_nginx and not self.with_nginx:
cmds.append(self.setup_nginx_cmd(instance, env_variables, docker_compose_yml_dir))
if with_hdfs and not self.with_hdfs:
2021-02-18 21:21:50 +00:00
cmds.append(self.setup_hdfs_cmd(instance, env_variables, docker_compose_yml_dir))
2020-09-10 10:02:46 +00:00
if with_kerberized_hdfs and not self.with_kerberized_hdfs:
2021-02-19 14:42:43 +00:00
cmds.append(self.setup_kerberized_hdfs_cmd(instance, env_variables, docker_compose_yml_dir))
2020-09-10 10:02:46 +00:00
2021-07-27 15:54:13 +00:00
if (with_mongo or with_mongo_secure) and not (self.with_mongo or self.with_mongo_secure):
if with_mongo_secure:
cmds.append(self.setup_mongo_secure_cmd(instance, env_variables, docker_compose_yml_dir))
2021-07-27 17:43:41 +00:00
else:
2021-07-27 15:54:13 +00:00
cmds.append(self.setup_mongo_cmd(instance, env_variables, docker_compose_yml_dir))
if self.with_net_trics:
for cmd in cmds:
cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_net.yml')])
2019-03-21 18:10:55 +00:00
if with_redis and not self.with_redis:
2021-02-16 14:16:15 +00:00
cmds.append(self.setup_redis_cmd(instance, env_variables, docker_compose_yml_dir))
2019-03-21 18:10:55 +00:00
if with_minio and not self.with_minio:
2021-02-19 12:58:11 +00:00
cmds.append(self.setup_minio_cmd(instance, env_variables, docker_compose_yml_dir))
if minio_certs_dir is not None:
if self.minio_certs_dir is None:
self.minio_certs_dir = minio_certs_dir
2021-02-12 15:51:21 +00:00
else:
2021-06-13 12:56:22 +00:00
raise Exception("Overwriting minio certs dir")
2019-03-21 18:10:55 +00:00
if with_cassandra and not self.with_cassandra:
cmds.append(self.setup_cassandra_cmd(instance, env_variables, docker_compose_yml_dir))
2021-06-07 12:56:29 +00:00
if with_jdbc_bridge and not self.with_jdbc_bridge:
cmds.append(self.setup_jdbc_bridge_cmd(instance, env_variables, docker_compose_yml_dir))
2021-02-18 21:21:50 +00:00
logging.debug("Cluster name:{} project_name:{}. Added instance name:{} tag:{} base_cmd:{} docker_compose_yml_dir:{}".format(
self.name, self.project_name, name, tag, self.base_cmd, docker_compose_yml_dir))
return instance
def get_instance_docker_id(self, instance_name):
# According to how docker-compose names containers.
return self.project_name + '_' + instance_name + '_1'
2019-06-04 20:59:31 +00:00
def _replace(self, path, what, to):
with open(path, 'r') as p:
data = p.read()
data = data.replace(what, to)
with open(path, 'w') as p:
p.write(data)
def restart_instance_with_ip_change(self, node, new_ip):
if '::' in new_ip:
if node.ipv6_address is None:
2019-07-21 11:45:01 +00:00
raise Exception("You should specity ipv6_address in add_node method")
2019-06-04 20:59:31 +00:00
self._replace(node.docker_compose_path, node.ipv6_address, new_ip)
node.ipv6_address = new_ip
else:
if node.ipv4_address is None:
2019-07-21 11:45:01 +00:00
raise Exception("You should specity ipv4_address in add_node method")
2019-06-04 20:59:31 +00:00
self._replace(node.docker_compose_path, node.ipv4_address, new_ip)
node.ipv4_address = new_ip
2021-01-22 14:27:23 +00:00
run_and_check(self.base_cmd + ["stop", node.name])
run_and_check(self.base_cmd + ["rm", "--force", "--stop", node.name])
run_and_check(self.base_cmd + ["up", "--force-recreate", "--no-deps", "-d", node.name])
2019-06-04 20:59:31 +00:00
node.ip_address = self.get_instance_ip(node.name)
node.client = Client(node.ip_address, command=self.client_bin_path)
logging.info("Restart node with ip change")
# In builds with sanitizer the server can take a long time to start
2021-05-25 15:32:24 +00:00
node.wait_for_start(start_timeout=180.0, connection_timeout=600.0) # seconds
2021-11-11 07:41:48 +00:00
res = node.client.query("SELECT 30")
logging.debug(f"Read '{res}'")
assert "30\n" == res
logging.info("Restarted")
2019-06-04 20:59:31 +00:00
return node
2021-02-16 07:10:01 +00:00
def restart_service(self, service_name):
run_and_check(self.base_cmd + ["restart", service_name])
def get_instance_ip(self, instance_name):
2021-02-18 21:21:50 +00:00
logging.debug("get_instance_ip instance_name={}".format(instance_name))
docker_id = self.get_instance_docker_id(instance_name)
2021-05-21 13:29:43 +00:00
# for cont in self.docker_client.containers.list():
# logging.debug("CONTAINERS LIST: ID={} NAME={} STATUS={}".format(cont.id, cont.name, cont.status))
handle = self.docker_client.containers.get(docker_id)
2020-10-02 16:54:07 +00:00
return list(handle.attrs['NetworkSettings']['Networks'].values())[0]['IPAddress']
def get_container_id(self, instance_name):
2021-06-01 14:18:35 +00:00
return self.get_instance_docker_id(instance_name)
# docker_id = self.get_instance_docker_id(instance_name)
# handle = self.docker_client.containers.get(docker_id)
# return handle.attrs['Id']
def get_container_logs(self, instance_name):
container_id = self.get_container_id(instance_name)
2020-10-02 16:54:07 +00:00
return self.docker_client.api.logs(container_id).decode()
2021-06-01 09:53:36 +00:00
def exec_in_container(self, container_id, cmd, detach=False, nothrow=False, use_cli=True, **kwargs):
if use_cli:
2021-06-01 14:18:35 +00:00
logging.debug(f"run container_id:{container_id} detach:{detach} nothrow:{nothrow} cmd: {cmd}")
exec_cmd = ["docker", "exec"]
if 'user' in kwargs:
exec_cmd += ['-u', kwargs['user']]
result = subprocess_check_call(exec_cmd + [container_id] + cmd, detach=detach, nothrow=nothrow)
2021-06-01 09:53:36 +00:00
return result
else:
exec_id = self.docker_client.api.exec_create(container_id, cmd, **kwargs)
output = self.docker_client.api.exec_start(exec_id, detach=detach)
exit_code = self.docker_client.api.exec_inspect(exec_id)['ExitCode']
if exit_code:
container_info = self.docker_client.api.inspect_container(container_id)
image_id = container_info.get('Image')
image_info = self.docker_client.api.inspect_image(image_id)
logging.debug(("Command failed in container {}: ".format(container_id)))
pprint.pprint(container_info)
logging.debug("")
logging.debug(("Container {} uses image {}: ".format(container_id, image_id)))
pprint.pprint(image_info)
logging.debug("")
message = 'Cmd "{}" failed in container {}. Return code {}. Output: {}'.format(' '.join(cmd), container_id,
exit_code, output)
if nothrow:
logging.debug(message)
else:
raise Exception(message)
if not detach:
return output.decode()
return output
def copy_file_to_container(self, container_id, local_path, dest_path):
2020-10-02 16:54:07 +00:00
with open(local_path, "r") as fdata:
data = fdata.read()
2020-10-02 16:54:07 +00:00
encodedBytes = base64.b64encode(data.encode("utf-8"))
encodedStr = str(encodedBytes, "utf-8")
self.exec_in_container(container_id,
2020-10-02 16:54:07 +00:00
["bash", "-c", "echo {} | base64 --decode > {}".format(encodedStr, dest_path)],
user='root')
2021-06-07 12:56:29 +00:00
def wait_for_url(self, url="http://localhost:8123/ping", conn_timeout=2, interval=2, timeout=60):
if not url.startswith('http'):
url = "http://" + url
if interval <= 0:
interval = 2
if timeout <= 0:
timeout = 60
attempts = 1
errors = []
start = time.time()
while time.time() - start < timeout:
try:
requests.get(url, allow_redirects=True, timeout=conn_timeout, verify=False).raise_for_status()
logging.debug("{} is available after {} seconds".format(url, time.time() - start))
return
except Exception as ex:
logging.debug("{} Attempt {} failed, retrying in {} seconds".format(ex, attempts, interval))
attempts += 1
errors += [str(ex)]
time.sleep(interval)
run_and_check(['docker', 'ps', '--all'])
2021-06-07 12:56:29 +00:00
logging.error("Can't connect to URL:{}".format(errors))
raise Exception("Cannot wait URL {}(interval={}, timeout={}, attempts={})".format(
url, interval, timeout, attempts))
2021-04-13 14:55:31 +00:00
def wait_mysql_client_to_start(self, timeout=180):
start = time.time()
errors = []
self.mysql_client_container = self.get_docker_handle(self.get_instance_docker_id(self.mysql_client_host))
while time.time() - start < timeout:
try:
info = self.mysql_client_container.client.api.inspect_container(self.mysql_client_container.name)
if info['State']['Health']['Status'] == 'healthy':
logging.debug("Mysql Client Container Started")
break
time.sleep(1)
return
except Exception as ex:
errors += [str(ex)]
time.sleep(1)
2021-06-04 10:14:32 +00:00
run_and_check(['docker-compose', 'ps', '--services', '--all'])
2021-04-13 14:55:31 +00:00
logging.error("Can't connect to MySQL Client:{}".format(errors))
raise Exception("Cannot wait MySQL Client container")
2021-04-13 10:52:22 +00:00
def wait_mysql_to_start(self, timeout=180):
2021-03-05 13:39:51 +00:00
self.mysql_ip = self.get_instance_ip('mysql57')
2018-08-22 15:42:27 +00:00
start = time.time()
2021-02-16 07:10:01 +00:00
errors = []
2018-08-22 15:42:27 +00:00
while time.time() - start < timeout:
try:
2021-03-05 13:39:51 +00:00
conn = pymysql.connect(user='root', password='clickhouse', host=self.mysql_ip, port=self.mysql_port)
2018-08-22 15:42:27 +00:00
conn.close()
2021-02-18 21:21:50 +00:00
logging.debug("Mysql Started")
2018-08-22 15:42:27 +00:00
return
2018-08-27 14:45:37 +00:00
except Exception as ex:
2021-02-16 07:10:01 +00:00
errors += [str(ex)]
2018-08-22 15:42:27 +00:00
time.sleep(0.5)
2021-06-04 10:14:32 +00:00
run_and_check(['docker-compose', 'ps', '--services', '--all'])
2021-02-16 07:10:01 +00:00
logging.error("Can't connect to MySQL:{}".format(errors))
2018-08-22 15:42:27 +00:00
raise Exception("Cannot wait MySQL container")
2021-04-13 16:22:10 +00:00
def wait_mysql8_to_start(self, timeout=180):
2021-03-19 11:44:03 +00:00
self.mysql8_ip = self.get_instance_ip('mysql80')
2021-02-16 07:10:01 +00:00
start = time.time()
while time.time() - start < timeout:
try:
2021-03-19 11:44:03 +00:00
conn = pymysql.connect(user='root', password='clickhouse', host=self.mysql8_ip, port=self.mysql8_port)
2021-02-16 07:10:01 +00:00
conn.close()
2021-02-18 21:21:50 +00:00
logging.debug("Mysql 8 Started")
2021-02-16 07:10:01 +00:00
return
except Exception as ex:
2021-02-18 21:21:50 +00:00
logging.debug("Can't connect to MySQL 8 " + str(ex))
2021-02-16 07:10:01 +00:00
time.sleep(0.5)
2021-06-04 10:14:32 +00:00
run_and_check(['docker-compose', 'ps', '--services', '--all'])
2021-02-16 07:10:01 +00:00
raise Exception("Cannot wait MySQL 8 container")
2021-04-13 10:52:22 +00:00
def wait_mysql_cluster_to_start(self, timeout=180):
self.mysql2_ip = self.get_instance_ip(self.mysql2_host)
self.mysql3_ip = self.get_instance_ip(self.mysql3_host)
self.mysql4_ip = self.get_instance_ip(self.mysql4_host)
start = time.time()
errors = []
while time.time() - start < timeout:
try:
for ip in [self.mysql2_ip, self.mysql3_ip, self.mysql4_ip]:
conn = pymysql.connect(user='root', password='clickhouse', host=ip, port=self.mysql_port)
conn.close()
logging.debug(f"Mysql Started {ip}")
return
except Exception as ex:
errors += [str(ex)]
time.sleep(0.5)
2021-06-04 10:14:32 +00:00
run_and_check(['docker-compose', 'ps', '--services', '--all'])
logging.error("Can't connect to MySQL:{}".format(errors))
raise Exception("Cannot wait MySQL container")
2021-07-04 14:56:31 +00:00
def wait_postgres_to_start(self, timeout=260):
self.postgres_ip = self.get_instance_ip(self.postgres_host)
2021-04-08 14:43:57 +00:00
start = time.time()
while time.time() - start < timeout:
try:
2021-07-16 13:26:35 +00:00
self.postgres_conn = psycopg2.connect(host=self.postgres_ip, port=self.postgres_port, database='postgres', user='postgres', password='mysecretpassword')
2021-07-12 08:32:20 +00:00
self.postgres_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.postgres_conn.autocommit = True
2021-04-08 14:43:57 +00:00
logging.debug("Postgres Started")
return
except Exception as ex:
logging.debug("Can't connect to Postgres " + str(ex))
time.sleep(0.5)
raise Exception("Cannot wait Postgres container")
2021-04-13 10:52:22 +00:00
def wait_postgres_cluster_to_start(self, timeout=180):
self.postgres2_ip = self.get_instance_ip(self.postgres2_host)
self.postgres3_ip = self.get_instance_ip(self.postgres3_host)
self.postgres4_ip = self.get_instance_ip(self.postgres4_host)
start = time.time()
2021-07-12 08:32:20 +00:00
while time.time() - start < timeout:
try:
2021-07-16 13:26:35 +00:00
self.postgres2_conn = psycopg2.connect(host=self.postgres2_ip, port=self.postgres_port, database='postgres', user='postgres', password='mysecretpassword')
2021-07-12 08:32:20 +00:00
self.postgres2_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.postgres2_conn.autocommit = True
logging.debug("Postgres Cluster host 2 started")
break
except Exception as ex:
logging.debug("Can't connect to Postgres host 2" + str(ex))
time.sleep(0.5)
while time.time() - start < timeout:
try:
2021-07-16 13:26:35 +00:00
self.postgres3_conn = psycopg2.connect(host=self.postgres3_ip, port=self.postgres_port, database='postgres', user='postgres', password='mysecretpassword')
2021-07-12 08:32:20 +00:00
self.postgres3_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.postgres3_conn.autocommit = True
logging.debug("Postgres Cluster host 3 started")
break
except Exception as ex:
logging.debug("Can't connect to Postgres host 3" + str(ex))
time.sleep(0.5)
while time.time() - start < timeout:
try:
2021-07-16 13:26:35 +00:00
self.postgres4_conn = psycopg2.connect(host=self.postgres4_ip, port=self.postgres_port, database='postgres', user='postgres', password='mysecretpassword')
2021-07-12 08:32:20 +00:00
self.postgres4_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.postgres4_conn.autocommit = True
logging.debug("Postgres Cluster host 4 started")
return
except Exception as ex:
logging.debug("Can't connect to Postgres host 4" + str(ex))
time.sleep(0.5)
raise Exception("Cannot wait Postgres container")
2021-07-27 06:16:40 +00:00
def wait_rabbitmq_to_start(self, timeout=180, throw=True):
2021-04-30 09:18:12 +00:00
self.rabbitmq_ip = self.get_instance_ip(self.rabbitmq_host)
start = time.time()
while time.time() - start < timeout:
try:
if check_rabbitmq_is_available(self.rabbitmq_docker_id):
logging.debug("RabbitMQ is available")
if enable_consistent_hash_plugin(self.rabbitmq_docker_id):
logging.debug("RabbitMQ consistent hash plugin is available")
2021-07-27 06:16:40 +00:00
return True
2021-04-30 09:18:12 +00:00
time.sleep(0.5)
except Exception as ex:
logging.debug("Can't connect to RabbitMQ " + str(ex))
time.sleep(0.5)
2021-07-27 06:16:40 +00:00
if throw:
raise Exception("Cannot wait RabbitMQ container")
return False
2021-04-30 09:18:12 +00:00
2021-06-13 12:56:22 +00:00
def wait_nginx_to_start(self, timeout=60):
self.nginx_ip = self.get_instance_ip(self.nginx_host)
start = time.time()
while time.time() - start < timeout:
try:
self.exec_in_container(self.nginx_id, ["curl", "-X", "PUT", "-d", "Test", "http://test.com/test.txt"])
res = self.exec_in_container(self.nginx_id, ["curl", "-X", "GET", "http://test.com/test.txt"])
assert(res == 'Test')
print('nginx static files server is available')
return
except Exception as ex:
print("Can't connect to nginx: " + str(ex))
time.sleep(0.5)
2021-05-21 13:29:43 +00:00
def wait_zookeeper_secure_to_start(self, timeout=20):
logging.debug("Wait ZooKeeper Secure to start")
start = time.time()
while time.time() - start < timeout:
try:
for instance in ['zoo1', 'zoo2', 'zoo3']:
2021-05-24 08:23:04 +00:00
conn = self.get_kazoo_client(instance)
2021-05-21 13:29:43 +00:00
conn.get_children('/')
2021-06-01 14:18:35 +00:00
conn.stop()
2021-05-21 13:29:43 +00:00
logging.debug("All instances of ZooKeeper Secure started")
return
except Exception as ex:
logging.debug("Can't connect to ZooKeeper secure " + str(ex))
time.sleep(0.5)
raise Exception("Cannot wait ZooKeeper secure container")
2021-04-13 10:52:22 +00:00
def wait_zookeeper_to_start(self, timeout=180):
2021-05-21 13:29:43 +00:00
logging.debug("Wait ZooKeeper to start")
start = time.time()
while time.time() - start < timeout:
try:
for instance in ['zoo1', 'zoo2', 'zoo3']:
2021-05-24 08:23:04 +00:00
conn = self.get_kazoo_client(instance)
conn.get_children('/')
2021-06-01 14:18:35 +00:00
conn.stop()
2021-02-18 21:21:50 +00:00
logging.debug("All instances of ZooKeeper started")
return
2018-08-27 14:45:37 +00:00
except Exception as ex:
2021-02-18 21:21:50 +00:00
logging.debug("Can't connect to ZooKeeper " + str(ex))
time.sleep(0.5)
raise Exception("Cannot wait ZooKeeper container")
2021-04-13 16:22:10 +00:00
def make_hdfs_api(self, timeout=180, kerberized=False):
2020-09-10 10:02:46 +00:00
if kerberized:
keytab = p.abspath(p.join(self.instances['node1'].path, "secrets/clickhouse.keytab"))
krb_conf = p.abspath(p.join(self.instances['node1'].path, "secrets/krb_long.conf"))
2021-06-09 09:23:02 +00:00
self.hdfs_kerberized_ip = self.get_instance_ip(self.hdfs_kerberized_host)
kdc_ip = self.get_instance_ip('hdfskerberos')
2021-06-13 12:56:22 +00:00
2021-06-09 09:23:02 +00:00
self.hdfs_api = HDFSApi(user="root",
timeout=timeout,
kerberized=True,
principal="root@TEST.CLICKHOUSE.TECH",
keytab=keytab,
krb_conf=krb_conf,
host=self.hdfs_kerberized_host,
protocol="http",
proxy_port=self.hdfs_kerberized_name_port,
data_port=self.hdfs_kerberized_data_port,
hdfs_ip=self.hdfs_kerberized_ip,
2021-06-13 12:56:22 +00:00
kdc_ip=kdc_ip)
2020-09-10 10:02:46 +00:00
else:
2021-06-09 09:23:02 +00:00
self.hdfs_ip = self.get_instance_ip(self.hdfs_host)
self.hdfs_api = HDFSApi(user="root", host=self.hdfs_host, data_port=self.hdfs_data_port, proxy_port=self.hdfs_name_port, hdfs_ip=self.hdfs_ip)
2020-09-10 10:02:46 +00:00
2021-02-24 11:46:58 +00:00
def wait_kafka_is_available(self, kafka_docker_id, kafka_port, max_retries=50):
retries = 0
while True:
if check_kafka_is_available(kafka_docker_id, kafka_port):
break
else:
retries += 1
if retries > max_retries:
raise Exception("Kafka is not available")
logging.debug("Waiting for Kafka to start up")
time.sleep(1)
def wait_hdfs_to_start(self, timeout=300, check_marker=False):
start = time.time()
while time.time() - start < timeout:
try:
2021-06-09 09:23:02 +00:00
self.hdfs_api.write_data("/somefilewithrandomname222", "1")
2021-02-18 21:21:50 +00:00
logging.debug("Connected to HDFS and SafeMode disabled! ")
if check_marker:
self.hdfs_api.read_data("/preparations_done_marker")
return
except Exception as ex:
logging.exception("Can't connect to HDFS or preparations are not done yet " + str(ex))
time.sleep(1)
raise Exception("Can't wait HDFS to start")
2021-07-27 15:54:13 +00:00
def wait_mongo_to_start(self, timeout=30, secure=False):
2019-02-25 10:45:22 +00:00
connection_str = 'mongodb://{user}:{password}@{host}:{port}'.format(
2021-02-12 15:51:21 +00:00
host='localhost', port=self.mongo_port, user='root', password='clickhouse')
2021-07-27 15:54:13 +00:00
if secure:
connection_str += '/?tls=true&tlsAllowInvalidCertificates=true'
2019-02-25 10:45:22 +00:00
connection = pymongo.MongoClient(connection_str)
start = time.time()
while time.time() - start < timeout:
try:
2020-09-22 13:52:56 +00:00
connection.list_database_names()
2021-05-26 12:06:13 +00:00
logging.debug(f"Connected to Mongo dbs: {connection.database_names()}")
2019-02-25 10:45:22 +00:00
return
except Exception as ex:
2021-02-18 21:21:50 +00:00
logging.debug("Can't connect to Mongo " + str(ex))
2019-02-25 10:45:22 +00:00
time.sleep(1)
2021-04-13 10:52:22 +00:00
def wait_minio_to_start(self, timeout=180, secure=False):
self.minio_ip = self.get_instance_ip(self.minio_host)
2021-04-29 11:57:48 +00:00
self.minio_redirect_ip = self.get_instance_ip(self.minio_redirect_host)
2021-02-24 18:24:16 +00:00
os.environ['SSL_CERT_FILE'] = p.join(self.base_dir, self.minio_dir, 'certs', 'public.crt')
minio_client = Minio(f'{self.minio_ip}:{self.minio_port}',
access_key='minio',
secret_key='minio123',
2021-04-29 11:57:48 +00:00
secure=secure,
http_client=urllib3.PoolManager(cert_reqs='CERT_NONE')) # disable SSL check as we test ClickHouse and not Python library
start = time.time()
while time.time() - start < timeout:
try:
minio_client.list_buckets()
2021-02-18 21:21:50 +00:00
logging.debug("Connected to Minio.")
buckets = [self.minio_bucket, self.minio_bucket_2]
for bucket in buckets:
if minio_client.bucket_exists(bucket):
2021-04-30 12:22:37 +00:00
delete_object_list = map(
2021-06-21 08:02:27 +00:00
lambda x: x.object_name,
minio_client.list_objects_v2(bucket, recursive=True),
2021-04-30 12:22:37 +00:00
)
errors = minio_client.remove_objects(bucket, delete_object_list)
for error in errors:
logging.error(f"Error occured when deleting object {error}")
minio_client.remove_bucket(bucket)
minio_client.make_bucket(bucket)
2021-02-18 21:21:50 +00:00
logging.debug("S3 bucket '%s' created", bucket)
self.minio_client = minio_client
return
except Exception as ex:
2021-02-18 21:21:50 +00:00
logging.debug("Can't connect to Minio: %s", str(ex))
time.sleep(1)
raise Exception("Can't wait Minio to start")
2021-04-13 10:52:22 +00:00
def wait_schema_registry_to_start(self, timeout=180):
sr_client = CachedSchemaRegistryClient({"url":'http://localhost:{}'.format(self.schema_registry_port)})
2020-02-03 00:02:19 +00:00
start = time.time()
while time.time() - start < timeout:
try:
sr_client._send_request(sr_client.url)
2021-02-18 21:21:50 +00:00
logging.debug("Connected to SchemaRegistry")
2021-02-18 12:57:45 +00:00
return sr_client
2020-02-03 00:02:19 +00:00
except Exception as ex:
2021-02-18 21:21:50 +00:00
logging.debug(("Can't connect to SchemaRegistry: %s", str(ex)))
2020-02-03 00:02:19 +00:00
time.sleep(1)
2021-04-07 12:22:53 +00:00
raise Exception("Can't wait Schema Registry to start")
2021-04-13 10:52:22 +00:00
def wait_cassandra_to_start(self, timeout=180):
2021-04-07 12:22:53 +00:00
self.cassandra_ip = self.get_instance_ip(self.cassandra_host)
cass_client = cassandra.cluster.Cluster([self.cassandra_ip], port=self.cassandra_port, load_balancing_policy=RoundRobinPolicy())
2020-05-26 19:21:18 +00:00
start = time.time()
while time.time() - start < timeout:
try:
2021-04-07 12:22:53 +00:00
logging.info(f"Check Cassandra Online {self.cassandra_id} {self.cassandra_ip} {self.cassandra_port}")
check = self.exec_in_container(self.cassandra_id, ["bash", "-c", f"/opt/cassandra/bin/cqlsh -u cassandra -p cassandra -e 'describe keyspaces' {self.cassandra_ip} {self.cassandra_port}"], user='root')
logging.info("Cassandra Online")
2020-05-27 20:13:25 +00:00
cass_client.connect()
2021-04-07 12:22:53 +00:00
logging.info("Connected Clients to Cassandra")
2020-05-26 19:21:18 +00:00
return
except Exception as ex:
2020-05-27 20:13:25 +00:00
logging.warning("Can't connect to Cassandra: %s", str(ex))
2020-05-26 19:21:18 +00:00
time.sleep(1)
2021-04-07 12:22:53 +00:00
raise Exception("Can't wait Cassandra to start")
def start(self, destroy_dirs=True):
2021-07-01 14:41:59 +00:00
pytest_xdist_logging_to_separate_files.setup()
logging.info("Running tests in {}".format(self.base_path))
2021-02-18 21:21:50 +00:00
logging.debug("Cluster start called. is_up={}, destroy_dirs={}".format(self.is_up, destroy_dirs))
if self.is_up:
return
try:
2021-04-27 09:07:21 +00:00
self.cleanup()
except Exception as e:
logging.warning("Cleanup failed:{e}")
2021-03-05 13:39:51 +00:00
try:
# clickhouse_pull_cmd = self.base_cmd + ['pull']
# print(f"Pulling images for {self.base_cmd}")
# retry_exception(10, 5, subprocess_check_call, Exception, clickhouse_pull_cmd)
if destroy_dirs and p.exists(self.instances_dir):
2021-11-11 07:41:48 +00:00
logging.debug(f"Removing instances dir {self.instances_dir}")
shutil.rmtree(self.instances_dir)
2020-10-02 16:54:07 +00:00
for instance in list(self.instances.values()):
2021-02-18 21:21:50 +00:00
logging.debug(('Setup directory for instance: {} destroy_dirs: {}'.format(instance.name, destroy_dirs)))
instance.create_dir(destroy_dir=destroy_dirs)
_create_env_file(os.path.join(self.env_file), self.env_variables)
2021-06-01 08:43:44 +00:00
self.docker_client = docker.DockerClient(base_url='unix:///var/run/docker.sock', version=self.docker_api_version, timeout=600)
2021-10-26 14:17:51 +00:00
common_opts = ['--verbose', 'up', '-d']
2021-05-21 13:29:43 +00:00
if self.with_zookeeper_secure and self.base_zookeeper_cmd:
logging.debug('Setup ZooKeeper Secure')
logging.debug(f'Creating internal ZooKeeper dirs: {self.zookeeper_dirs_to_create}')
for i in range(1,3):
if os.path.exists(self.zookeeper_instance_dir_prefix + f"{i}"):
shutil.rmtree(self.zookeeper_instance_dir_prefix + f"{i}")
for dir in self.zookeeper_dirs_to_create:
os.makedirs(dir)
run_and_check(self.base_zookeeper_cmd + common_opts, env=self.env)
self.up_called = True
2021-05-21 13:29:43 +00:00
self.wait_zookeeper_secure_to_start()
for command in self.pre_zookeeper_commands:
2021-05-24 08:23:04 +00:00
self.run_kazoo_commands_with_retries(command, repeats=5)
2021-05-21 13:29:43 +00:00
if self.with_zookeeper and self.base_zookeeper_cmd:
2021-02-18 21:21:50 +00:00
logging.debug('Setup ZooKeeper')
logging.debug(f'Creating internal ZooKeeper dirs: {self.zookeeper_dirs_to_create}')
2021-05-20 18:19:45 +00:00
if self.use_keeper:
for i in range(1,4):
if os.path.exists(self.keeper_instance_dir_prefix + f"{i}"):
shutil.rmtree(self.keeper_instance_dir_prefix + f"{i}")
else:
for i in range(1,3):
if os.path.exists(self.zookeeper_instance_dir_prefix + f"{i}"):
shutil.rmtree(self.zookeeper_instance_dir_prefix + f"{i}")
for dir in self.zookeeper_dirs_to_create:
os.makedirs(dir)
2021-06-13 12:56:22 +00:00
if self.use_keeper: # TODO: remove hardcoded paths from here
for i in range(1,4):
shutil.copy(os.path.join(HELPERS_DIR, f'keeper_config{i}.xml'), os.path.join(self.keeper_instance_dir_prefix + f"{i}", "config" ))
2021-05-14 12:30:49 +00:00
run_and_check(self.base_zookeeper_cmd + common_opts, env=self.env)
self.up_called = True
2021-05-21 13:29:43 +00:00
self.wait_zookeeper_to_start()
for command in self.pre_zookeeper_commands:
self.run_kazoo_commands_with_retries(command, repeats=5)
2021-04-13 14:55:31 +00:00
if self.with_mysql_client and self.base_mysql_client_cmd:
logging.debug('Setup MySQL Client')
subprocess_check_call(self.base_mysql_client_cmd + common_opts)
self.wait_mysql_client_to_start()
if self.with_mysql and self.base_mysql_cmd:
2021-02-18 21:21:50 +00:00
logging.debug('Setup MySQL')
2021-03-09 07:32:10 +00:00
if os.path.exists(self.mysql_dir):
shutil.rmtree(self.mysql_dir)
2021-03-05 13:39:51 +00:00
os.makedirs(self.mysql_logs_dir)
os.chmod(self.mysql_logs_dir, stat.S_IRWXO)
subprocess_check_call(self.base_mysql_cmd + common_opts)
self.up_called = True
2021-04-13 10:52:22 +00:00
self.wait_mysql_to_start()
2021-02-16 07:10:01 +00:00
if self.with_mysql8 and self.base_mysql8_cmd:
2021-02-18 21:21:50 +00:00
logging.debug('Setup MySQL 8')
2021-03-09 07:32:10 +00:00
if os.path.exists(self.mysql8_dir):
shutil.rmtree(self.mysql8_dir)
2021-03-05 13:39:51 +00:00
os.makedirs(self.mysql8_logs_dir)
2021-03-09 07:32:10 +00:00
os.chmod(self.mysql8_logs_dir, stat.S_IRWXO)
2021-02-16 07:10:01 +00:00
subprocess_check_call(self.base_mysql8_cmd + common_opts)
2021-04-13 10:52:22 +00:00
self.wait_mysql8_to_start()
2021-02-16 07:10:01 +00:00
if self.with_mysql_cluster and self.base_mysql_cluster_cmd:
print('Setup MySQL')
if os.path.exists(self.mysql_cluster_dir):
shutil.rmtree(self.mysql_cluster_dir)
os.makedirs(self.mysql_cluster_logs_dir)
os.chmod(self.mysql_cluster_logs_dir, stat.S_IRWXO)
subprocess_check_call(self.base_mysql_cluster_cmd + common_opts)
self.up_called = True
2021-04-13 10:52:22 +00:00
self.wait_mysql_cluster_to_start()
if self.with_postgres and self.base_postgres_cmd:
2021-02-18 21:21:50 +00:00
logging.debug('Setup Postgres')
2021-03-19 16:44:08 +00:00
if os.path.exists(self.postgres_dir):
shutil.rmtree(self.postgres_dir)
os.makedirs(self.postgres_logs_dir)
os.chmod(self.postgres_logs_dir, stat.S_IRWXO)
subprocess_check_call(self.base_postgres_cmd + common_opts)
self.up_called = True
2021-04-13 10:52:22 +00:00
self.wait_postgres_to_start()
if self.with_postgres_cluster and self.base_postgres_cluster_cmd:
print('Setup Postgres')
2021-04-08 14:43:57 +00:00
os.makedirs(self.postgres2_logs_dir)
os.chmod(self.postgres2_logs_dir, stat.S_IRWXO)
os.makedirs(self.postgres3_logs_dir)
os.chmod(self.postgres3_logs_dir, stat.S_IRWXO)
os.makedirs(self.postgres4_logs_dir)
os.chmod(self.postgres4_logs_dir, stat.S_IRWXO)
subprocess_check_call(self.base_postgres_cluster_cmd + common_opts)
self.up_called = True
2021-04-13 10:52:22 +00:00
self.wait_postgres_cluster_to_start()
if self.with_kafka and self.base_kafka_cmd:
2021-02-18 21:21:50 +00:00
logging.debug('Setup Kafka')
subprocess_check_call(self.base_kafka_cmd + common_opts + ['--renew-anon-volumes'])
self.up_called = True
2021-02-24 11:46:58 +00:00
self.wait_kafka_is_available(self.kafka_docker_id, self.kafka_port)
2021-04-13 10:52:22 +00:00
self.wait_schema_registry_to_start()
if self.with_kerberized_kafka and self.base_kerberized_kafka_cmd:
2021-02-18 21:21:50 +00:00
logging.debug('Setup kerberized kafka')
2021-02-12 15:51:21 +00:00
run_and_check(self.base_kerberized_kafka_cmd + common_opts + ['--renew-anon-volumes'])
self.up_called = True
2021-02-24 11:46:58 +00:00
self.wait_kafka_is_available(self.kerberized_kafka_docker_id, self.kerberized_kafka_port, 100)
2021-02-18 21:21:50 +00:00
if self.with_rabbitmq and self.base_rabbitmq_cmd:
2021-05-17 11:16:16 +00:00
logging.debug('Setup RabbitMQ')
os.makedirs(self.rabbitmq_logs_dir)
os.chmod(self.rabbitmq_logs_dir, stat.S_IRWXO)
2021-07-27 06:16:40 +00:00
for i in range(5):
subprocess_check_call(self.base_rabbitmq_cmd + common_opts + ['--renew-anon-volumes'])
self.up_called = True
2021-07-27 06:16:40 +00:00
self.rabbitmq_docker_id = self.get_instance_docker_id('rabbitmq1')
logging.debug(f"RabbitMQ checking container try: {i}")
if self.wait_rabbitmq_to_start(throw=(i==4)):
break
if self.with_hdfs and self.base_hdfs_cmd:
2021-02-18 21:21:50 +00:00
logging.debug('Setup HDFS')
os.makedirs(self.hdfs_logs_dir)
2021-03-16 10:00:49 +00:00
os.chmod(self.hdfs_logs_dir, stat.S_IRWXO)
subprocess_check_call(self.base_hdfs_cmd + common_opts)
self.up_called = True
2021-06-09 09:23:02 +00:00
self.make_hdfs_api()
self.wait_hdfs_to_start()
2020-09-10 10:02:46 +00:00
if self.with_kerberized_hdfs and self.base_kerberized_hdfs_cmd:
2021-02-18 21:21:50 +00:00
logging.debug('Setup kerberized HDFS')
2021-02-19 14:42:43 +00:00
os.makedirs(self.hdfs_kerberized_logs_dir)
2021-03-16 10:00:49 +00:00
os.chmod(self.hdfs_kerberized_logs_dir, stat.S_IRWXO)
2021-02-12 15:51:21 +00:00
run_and_check(self.base_kerberized_hdfs_cmd + common_opts)
self.up_called = True
2021-06-09 09:23:02 +00:00
self.make_hdfs_api(kerberized=True)
self.wait_hdfs_to_start(check_marker=True)
2020-09-10 10:02:46 +00:00
2021-06-13 12:56:22 +00:00
if self.with_nginx and self.base_nginx_cmd:
logging.debug('Setup nginx')
subprocess_check_call(self.base_nginx_cmd + common_opts + ['--renew-anon-volumes'])
self.up_called = True
2021-06-13 12:56:22 +00:00
self.nginx_docker_id = self.get_instance_docker_id('nginx')
self.wait_nginx_to_start()
if self.with_mongo and self.base_mongo_cmd:
2021-02-18 21:21:50 +00:00
logging.debug('Setup Mongo')
2021-01-22 14:27:23 +00:00
run_and_check(self.base_mongo_cmd + common_opts)
self.up_called = True
2021-07-27 15:54:13 +00:00
self.wait_mongo_to_start(30, secure=self.with_mongo_secure)
if self.with_redis and self.base_redis_cmd:
2021-02-18 21:21:50 +00:00
logging.debug('Setup Redis')
2021-02-12 15:51:21 +00:00
subprocess_check_call(self.base_redis_cmd + common_opts)
self.up_called = True
time.sleep(10)
if self.with_minio and self.base_minio_cmd:
2021-02-19 12:58:11 +00:00
# Copy minio certificates to minio/certs
os.mkdir(self.minio_dir)
if self.minio_certs_dir is None:
os.mkdir(os.path.join(self.minio_dir, 'certs'))
else:
2021-02-24 11:46:58 +00:00
shutil.copytree(os.path.join(self.base_dir, self.minio_certs_dir), os.path.join(self.minio_dir, 'certs'))
2021-02-19 12:58:11 +00:00
minio_start_cmd = self.base_minio_cmd + common_opts
2020-07-10 19:42:18 +00:00
logging.info("Trying to create Minio instance by command %s", ' '.join(map(str, minio_start_cmd)))
2021-02-12 15:51:21 +00:00
run_and_check(minio_start_cmd)
self.up_called = True
2021-02-12 15:51:21 +00:00
logging.info("Trying to connect to Minio...")
self.wait_minio_to_start(secure=self.minio_certs_dir is not None)
if self.with_cassandra and self.base_cassandra_cmd:
subprocess_check_call(self.base_cassandra_cmd + ['up', '-d'])
self.up_called = True
2020-05-26 19:21:18 +00:00
self.wait_cassandra_to_start()
2021-06-07 12:56:29 +00:00
if self.with_jdbc_bridge and self.base_jdbc_bridge_cmd:
2021-07-27 07:33:49 +00:00
os.makedirs(self.jdbc_driver_logs_dir)
os.chmod(self.jdbc_driver_logs_dir, stat.S_IRWXO)
2021-06-07 12:56:29 +00:00
subprocess_check_call(self.base_jdbc_bridge_cmd + ['up', '-d'])
self.up_called = True
2021-07-27 07:33:49 +00:00
self.jdbc_bridge_ip = self.get_instance_ip(self.jdbc_bridge_host)
self.wait_for_url(f"http://{self.jdbc_bridge_ip}:{self.jdbc_bridge_port}/ping")
2021-06-07 12:56:29 +00:00
clickhouse_start_cmd = self.base_cmd + ['up', '-d', '--no-recreate']
2021-02-18 21:21:50 +00:00
logging.debug(("Trying to create ClickHouse instance by command %s", ' '.join(map(str, clickhouse_start_cmd))))
2021-02-17 15:40:01 +00:00
self.up_called = True
run_and_check(clickhouse_start_cmd)
2021-02-18 21:21:50 +00:00
logging.debug("ClickHouse instance created")
2021-06-04 10:14:32 +00:00
start_timeout = 300.0 # seconds
2020-10-02 16:54:07 +00:00
for instance in self.instances.values():
instance.docker_client = self.docker_client
instance.ip_address = self.get_instance_ip(instance.name)
2021-06-30 11:16:37 +00:00
logging.debug(f"Waiting for ClickHouse start in {instance.name}, ip: {instance.ip_address}...")
instance.wait_for_start(start_timeout)
2021-06-30 11:16:37 +00:00
logging.debug(f"ClickHouse {instance.name} started")
instance.client = Client(instance.ip_address, command=self.client_bin_path)
self.is_up = True
2020-03-26 14:43:22 +00:00
2020-10-02 16:54:07 +00:00
except BaseException as e:
2021-02-18 21:21:50 +00:00
logging.debug("Failed to start cluster: ")
logging.debug(str(e))
logging.debug(traceback.print_exc())
2021-05-17 11:16:16 +00:00
self.shutdown()
raise
def shutdown(self, kill=True, ignore_fatal=True):
sanitizer_assert_instance = None
2021-03-03 08:09:44 +00:00
fatal_log = None
2021-02-17 15:40:01 +00:00
if self.up_called:
with open(self.docker_logs_path, "w+") as f:
try:
subprocess.check_call(self.base_cmd + ['logs'], stdout=f) # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL
except Exception as e:
2021-02-18 21:21:50 +00:00
logging.debug("Unable to get logs from docker.")
2021-02-17 15:40:01 +00:00
f.seek(0)
for line in f:
if SANITIZER_SIGN in line:
sanitizer_assert_instance = line.split('|')[0].strip()
break
if kill:
try:
run_and_check(self.base_cmd + ['stop', '--timeout', '20'])
2021-02-17 15:40:01 +00:00
except Exception as e:
2021-02-18 21:21:50 +00:00
logging.debug("Kill command failed during shutdown. {}".format(repr(e)))
logging.debug("Trying to kill forcefully")
run_and_check(self.base_cmd + ['kill'])
2021-02-17 15:40:01 +00:00
# Check server logs for Fatal messages and sanitizer failures.
# NOTE: we cannot do this via docker since in case of Fatal message container may already die.
for name, instance in self.instances.items():
if instance.contains_in_log(SANITIZER_SIGN, from_host=True):
2021-10-19 10:19:43 +00:00
sanitizer_assert_instance = instance.grep_in_log(SANITIZER_SIGN, from_host=True, filename='stderr.log')
logging.error("Sanitizer in instance %s log %s", name, sanitizer_assert_instance)
if not ignore_fatal and instance.contains_in_log("Fatal", from_host=True):
fatal_log = instance.grep_in_log("Fatal", from_host=True)
2021-09-21 14:29:05 +00:00
if 'Child process was terminated by signal 9 (KILL)' in fatal_log:
fatal_log = None
continue
logging.error("Crash in instance %s fatal log %s", name, fatal_log)
try:
2021-02-17 15:40:01 +00:00
subprocess_check_call(self.base_cmd + ['down', '--volumes'])
except Exception as e:
2021-10-26 14:17:51 +00:00
logging.debug("Down + remove orphans failed during shutdown. {}".format(repr(e)))
else:
logging.warning("docker-compose up was not called. Trying to export docker.log for running containers")
self.cleanup()
self.is_up = False
self.docker_client = None
2020-10-02 16:54:07 +00:00
for instance in list(self.instances.values()):
instance.docker_client = None
instance.ip_address = None
instance.client = None
if sanitizer_assert_instance is not None:
raise Exception(
"Sanitizer assert found in {} for instance {}".format(self.docker_logs_path, sanitizer_assert_instance))
if fatal_log is not None:
raise Exception("Fatal messages found: {}".format(fatal_log))
2021-03-03 08:09:44 +00:00
2020-03-26 14:43:22 +00:00
def pause_container(self, instance_name):
subprocess_check_call(self.base_cmd + ['pause', instance_name])
2020-03-26 14:43:22 +00:00
# subprocess_check_call(self.base_cmd + ['kill', '-s SIGSTOP', instance_name])
def unpause_container(self, instance_name):
subprocess_check_call(self.base_cmd + ['unpause', instance_name])
2020-03-26 14:43:22 +00:00
# subprocess_check_call(self.base_cmd + ['kill', '-s SIGCONT', instance_name])
2019-06-18 07:20:14 +00:00
def open_bash_shell(self, instance_name):
os.system(' '.join(self.base_cmd + ['exec', instance_name, '/bin/bash']))
2021-05-24 08:23:04 +00:00
def get_kazoo_client(self, zoo_instance_name):
use_ssl = False
if self.with_zookeeper_secure:
port = self.zookeeper_secure_port
use_ssl = True
elif self.with_zookeeper:
port = self.zookeeper_port
else:
raise Exception("Cluster has no ZooKeeper")
ip = self.get_instance_ip(zoo_instance_name)
2021-05-21 13:29:43 +00:00
logging.debug(f"get_kazoo_client: {zoo_instance_name}, ip:{ip}, port:{port}, use_ssl:{use_ssl}")
zk = KazooClient(hosts=f"{ip}:{port}", use_ssl=use_ssl, verify_certs=False, certfile=self.zookeeper_certfile,
keyfile=self.zookeeper_keyfile)
zk.start()
return zk
2021-05-24 08:23:04 +00:00
def run_kazoo_commands_with_retries(self, kazoo_callback, zoo_instance_name='zoo1', repeats=1, sleep_for=1):
2021-06-01 14:18:35 +00:00
zk = self.get_kazoo_client(zoo_instance_name)
2021-05-24 08:23:04 +00:00
logging.debug(f"run_kazoo_commands_with_retries: {zoo_instance_name}, {kazoo_callback}")
for i in range(repeats - 1):
2017-08-30 16:25:34 +00:00
try:
2021-06-01 14:18:35 +00:00
kazoo_callback(zk)
return
except KazooException as e:
2021-02-18 21:21:50 +00:00
logging.debug(repr(e))
2017-08-30 16:25:34 +00:00
time.sleep(sleep_for)
2021-06-01 14:18:35 +00:00
kazoo_callback(zk)
zk.stop()
2017-08-30 16:25:34 +00:00
def add_zookeeper_startup_command(self, command):
self.pre_zookeeper_commands.append(command)
2017-08-30 16:25:34 +00:00
def stop_zookeeper_nodes(self, zk_nodes):
for n in zk_nodes:
logging.info("Stopping zookeeper node: %s", n)
subprocess_check_call(self.base_zookeeper_cmd + ["stop", n])
def start_zookeeper_nodes(self, zk_nodes):
for n in zk_nodes:
logging.info("Starting zookeeper node: %s", n)
subprocess_check_call(self.base_zookeeper_cmd + ["start", n])
2017-08-30 16:25:34 +00:00
CLICKHOUSE_START_COMMAND = "clickhouse server --config-file=/etc/clickhouse-server/{main_config_file}" \
2021-02-12 15:51:21 +00:00
" --log-file=/var/log/clickhouse-server/clickhouse-server.log " \
" --errorlog-file=/var/log/clickhouse-server/clickhouse-server.err.log"
CLICKHOUSE_STAY_ALIVE_COMMAND = 'bash -c "trap \'pkill tail\' INT TERM; {} --daemon; coproc tail -f /dev/null; wait $$!"'.format(CLICKHOUSE_START_COMMAND)
2021-09-23 14:35:04 +00:00
# /run/xtables.lock passed inside for correct iptables --wait
DOCKER_COMPOSE_TEMPLATE = '''
version: '2.3'
services:
{name}:
2020-09-01 06:38:23 +00:00
image: {image}:{tag}
hostname: {hostname}
volumes:
- {instance_config_dir}:/etc/clickhouse-server/
- {db_dir}:/var/lib/clickhouse/
- {logs_dir}:/var/log/clickhouse-server/
- /etc/passwd:/etc/passwd:ro
- /run/xtables.lock:/run/xtables.lock:ro
{binary_volume}
{odbc_bridge_volume}
2021-03-11 17:48:47 +00:00
{library_bridge_volume}
{external_dirs_volumes}
2018-08-22 15:42:27 +00:00
{odbc_ini_path}
{keytab_path}
{krb5_conf}
entrypoint: {entrypoint_cmd}
tmpfs: {tmpfs}
2018-12-27 19:42:25 +00:00
cap_add:
- SYS_PTRACE
2020-10-27 12:24:10 +00:00
- NET_ADMIN
- IPC_LOCK
- SYS_NICE
depends_on: {depends_on}
2019-02-21 17:34:19 +00:00
user: '{user}'
env_file:
- {env_file}
2019-02-21 17:34:19 +00:00
security_opt:
- label:disable
dns_opt:
- attempts:2
- timeout:1
- inet6
- rotate
{networks}
{app_net}
{ipv4_address}
{ipv6_address}
{net_aliases}
{net_alias1}
'''
2019-06-04 20:59:31 +00:00
class ClickHouseInstance:
2018-08-22 15:42:27 +00:00
def __init__(
self, cluster, base_path, name, base_config_dir, custom_main_configs, custom_user_configs,
custom_dictionaries,
2021-04-13 14:55:31 +00:00
macros, with_zookeeper, zookeeper_config_path, with_mysql_client, with_mysql, with_mysql8, with_mysql_cluster, with_kafka, with_kerberized_kafka,
2021-06-13 12:56:22 +00:00
with_rabbitmq, with_nginx, with_kerberized_hdfs, with_mongo, with_redis, with_minio, with_jdbc_bridge,
2021-04-13 14:55:31 +00:00
with_cassandra, server_bin_path, odbc_bridge_bin_path, library_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, with_postgres, with_postgres_cluster,
2021-05-21 18:56:22 +00:00
clickhouse_start_command=CLICKHOUSE_START_COMMAND,
main_config_name="config.xml", users_config_name="users.xml", copy_common_configs=True,
hostname=None, env_variables=None,
2021-09-08 10:03:54 +00:00
image="clickhouse/integration-test", tag="latest",
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, external_dirs=None, tmpfs=None, config_root_name="clickhouse"):
self.name = name
self.base_cmd = cluster.base_cmd
self.docker_id = cluster.get_instance_docker_id(self.name)
self.cluster = cluster
self.hostname = hostname if hostname is not None else self.name
self.external_dirs = external_dirs
self.tmpfs = tmpfs or []
self.base_config_dir = p.abspath(p.join(base_path, base_config_dir)) if base_config_dir else None
self.custom_main_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_main_configs]
self.custom_user_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_user_configs]
self.custom_dictionaries_paths = [p.abspath(p.join(base_path, c)) for c in custom_dictionaries]
self.clickhouse_path_dir = p.abspath(p.join(base_path, clickhouse_path_dir)) if clickhouse_path_dir else None
self.kerberos_secrets_dir = p.abspath(p.join(base_path, 'secrets'))
self.macros = macros if macros is not None else {}
self.with_zookeeper = with_zookeeper
2017-08-30 16:25:34 +00:00
self.zookeeper_config_path = zookeeper_config_path
self.server_bin_path = server_bin_path
self.odbc_bridge_bin_path = odbc_bridge_bin_path
2021-03-11 17:48:47 +00:00
self.library_bridge_bin_path = library_bridge_bin_path
2021-04-13 14:55:31 +00:00
self.with_mysql_client = with_mysql_client
2018-05-14 11:10:07 +00:00
self.with_mysql = with_mysql
2021-02-16 07:10:01 +00:00
self.with_mysql8 = with_mysql8
self.with_mysql_cluster = with_mysql_cluster
2021-04-13 14:55:31 +00:00
self.with_postgres = with_postgres
self.with_postgres_cluster = with_postgres_cluster
self.with_kafka = with_kafka
self.with_kerberized_kafka = with_kerberized_kafka
self.with_rabbitmq = with_rabbitmq
2021-06-13 12:56:22 +00:00
self.with_nginx = with_nginx
2020-09-10 10:02:46 +00:00
self.with_kerberized_hdfs = with_kerberized_hdfs
2019-02-25 10:45:22 +00:00
self.with_mongo = with_mongo
2019-03-21 18:10:55 +00:00
self.with_redis = with_redis
self.with_minio = with_minio
self.with_cassandra = with_cassandra
2021-06-07 12:56:29 +00:00
self.with_jdbc_bridge = with_jdbc_bridge
2018-05-14 11:10:07 +00:00
2021-05-21 18:56:22 +00:00
self.main_config_name = main_config_name
self.users_config_name = users_config_name
self.copy_common_configs = copy_common_configs
self.clickhouse_start_command = clickhouse_start_command.replace("{main_config_file}", self.main_config_name)
2017-07-26 12:31:55 +00:00
self.path = p.join(self.cluster.instances_dir, name)
self.docker_compose_path = p.join(self.path, 'docker-compose.yml')
self.env_variables = env_variables or {}
self.env_file = self.cluster.env_file
2018-08-22 15:42:27 +00:00
if with_odbc_drivers:
self.odbc_ini_path = self.path + "/odbc.ini:/etc/odbc.ini"
2018-08-22 15:42:27 +00:00
self.with_mysql = True
else:
self.odbc_ini_path = ""
2020-09-10 10:02:46 +00:00
if with_kerberized_kafka or with_kerberized_hdfs:
self.keytab_path = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets:/tmp/keytab"
self.krb5_conf = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets/krb.conf:/etc/krb5.conf:ro"
else:
self.keytab_path = ""
self.krb5_conf = ""
self.docker_client = None
self.ip_address = None
self.client = None
2018-08-22 15:42:27 +00:00
self.image = image
2020-09-01 06:38:23 +00:00
self.tag = tag
self.stay_alive = stay_alive
self.ipv4_address = ipv4_address
self.ipv6_address = ipv6_address
self.with_installed_binary = with_installed_binary
2021-03-05 13:39:51 +00:00
self.is_up = False
2021-09-22 15:00:08 +00:00
self.config_root_name = config_root_name
2021-02-12 15:51:21 +00:00
2021-04-25 02:25:46 +00:00
def is_built_with_sanitizer(self, sanitizer_name=''):
2020-06-22 13:10:25 +00:00
build_opts = self.query("SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'")
2021-04-25 02:25:46 +00:00
return "-fsanitize={}".format(sanitizer_name) in build_opts
2021-08-19 11:32:32 +00:00
def is_debug_build(self):
build_opts = self.query("SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'")
return 'NDEBUG' not in build_opts
2021-04-25 02:25:46 +00:00
def is_built_with_thread_sanitizer(self):
2021-04-25 03:09:42 +00:00
return self.is_built_with_sanitizer('thread')
2020-06-22 13:10:25 +00:00
def is_built_with_address_sanitizer(self):
2021-04-25 03:09:42 +00:00
return self.is_built_with_sanitizer('address')
2021-04-25 02:25:46 +00:00
def is_built_with_memory_sanitizer(self):
2021-04-25 03:09:42 +00:00
return self.is_built_with_sanitizer('memory')
2017-08-14 01:29:19 +00:00
# Connects to the instance via clickhouse-client, sends a query (1st argument) and returns the answer
def query(self, sql,
stdin=None,
timeout=None,
settings=None,
user=None,
password=None,
database=None,
ignore_error=False,
query_id=None):
logging.debug("Executing query %s on %s", sql, self.name)
return self.client.query(sql,
stdin=stdin,
timeout=timeout,
settings=settings,
user=user,
password=password,
database=database,
ignore_error=ignore_error,
query_id=query_id)
def query_with_retry(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None, database=None,
ignore_error=False,
retry_count=20, sleep_time=0.5, check_callback=lambda x: True):
2021-07-01 14:41:59 +00:00
logging.debug(f"Executing query {sql} on {self.name}")
result = None
for i in range(retry_count):
try:
result = self.query(sql, stdin=stdin, timeout=timeout, settings=settings, user=user, password=password,
database=database, ignore_error=ignore_error)
if check_callback(result):
return result
time.sleep(sleep_time)
except Exception as ex:
2021-02-18 21:21:50 +00:00
logging.debug("Retry {} got exception {}".format(i + 1, ex))
time.sleep(sleep_time)
if result is not None:
return result
raise Exception("Can't execute query {}".format(sql))
2017-07-26 12:31:55 +00:00
# As query() but doesn't wait response and returns response handler
2021-07-01 14:41:59 +00:00
def get_query_request(self, sql, *args, **kwargs):
logging.debug(f"Executing query {sql} on {self.name}")
return self.client.get_query_request(sql, *args, **kwargs)
2019-04-07 00:31:20 +00:00
# Connects to the instance via clickhouse-client, sends a query (1st argument), expects an error and return its code
def query_and_get_error(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None,
database=None):
2021-07-01 14:41:59 +00:00
logging.debug(f"Executing query {sql} on {self.name}")
return self.client.query_and_get_error(sql, stdin=stdin, timeout=timeout, settings=settings, user=user,
password=password, database=database)
2019-04-07 00:31:20 +00:00
2019-07-17 11:55:18 +00:00
# The same as query_and_get_error but ignores successful query.
def query_and_get_answer_with_error(self, sql, stdin=None, timeout=None, settings=None, user=None, password=None,
database=None):
2021-07-01 14:41:59 +00:00
logging.debug(f"Executing query {sql} on {self.name}")
return self.client.query_and_get_answer_with_error(sql, stdin=stdin, timeout=timeout, settings=settings,
user=user, password=password, database=database)
2019-07-17 11:55:18 +00:00
# Connects to the instance via HTTP interface, sends a query and returns the answer
def http_query(self, sql, data=None, params=None, user=None, password=None, expect_fail_and_get_error=False):
2021-07-01 14:41:59 +00:00
logging.debug(f"Executing query {sql} on {self.name} via HTTP interface")
if params is None:
params = {}
else:
params = params.copy()
params["query"] = sql
2020-10-02 16:54:07 +00:00
auth = None
2020-02-29 12:57:52 +00:00
if user and password:
2020-10-02 16:54:07 +00:00
auth = requests.auth.HTTPBasicAuth(user, password)
2020-02-29 12:57:52 +00:00
elif user:
2020-10-02 16:54:07 +00:00
auth = requests.auth.HTTPBasicAuth(user, '')
url = "http://" + self.ip_address + ":8123/?" + urllib.parse.urlencode(params)
2020-10-02 16:54:07 +00:00
if data:
r = requests.post(url, data, auth=auth)
else:
r = requests.get(url, auth=auth)
def http_code_and_message():
2020-10-02 16:54:07 +00:00
code = r.status_code
return str(code) + " " + http.client.responses[code] + ": " + r.text
if expect_fail_and_get_error:
2020-10-02 16:54:07 +00:00
if r.ok:
raise Exception("ClickHouse HTTP server is expected to fail, but succeeded: " + r.text)
return http_code_and_message()
else:
2020-10-02 16:54:07 +00:00
if not r.ok:
raise Exception("ClickHouse HTTP server returned " + http_code_and_message())
2020-10-02 16:54:07 +00:00
return r.text
2019-11-14 02:20:06 +00:00
# Connects to the instance via HTTP interface, sends a query and returns the answer
def http_request(self, url, method='GET', params=None, data=None, headers=None):
2021-07-01 14:41:59 +00:00
logging.debug(f"Sending HTTP request {url} to {self.name}")
url = "http://" + self.ip_address + ":8123/" + url
return requests.request(method=method, url=url, params=params, data=data, headers=headers)
# Connects to the instance via HTTP interface, sends a query, expects an error and return the error message
def http_query_and_get_error(self, sql, data=None, params=None, user=None, password=None):
2021-07-01 14:41:59 +00:00
logging.debug(f"Executing query {sql} on {self.name} via HTTP interface")
return self.http_query(sql=sql, data=data, params=params, user=user, password=password,
expect_fail_and_get_error=True)
def stop_clickhouse(self, stop_wait_sec=30, kill=False):
2019-03-14 13:39:47 +00:00
if not self.stay_alive:
raise Exception("clickhouse can be stopped only with stay_alive=True instance")
2021-04-07 12:22:53 +00:00
try:
2021-10-26 14:17:51 +00:00
ps_clickhouse = self.exec_in_container(["bash", "-c", "ps -C clickhouse"], nothrow=True, user='root')
2021-04-07 12:22:53 +00:00
if ps_clickhouse == " PID TTY STAT TIME COMMAND" :
logging.warning("ClickHouse process already stopped")
return
self.exec_in_container(["bash", "-c", "pkill {} clickhouse".format("-9" if kill else "")], user='root')
start_time = time.time()
2021-09-24 15:25:19 +00:00
stopped = False
while time.time() <= start_time + stop_wait_sec:
2021-10-26 14:17:51 +00:00
pid = self.get_process_pid("clickhouse")
if pid is None:
2021-09-24 15:25:19 +00:00
stopped = True
break
2021-10-26 14:17:51 +00:00
else:
time.sleep(1)
2021-09-24 15:25:19 +00:00
if not stopped:
2021-10-19 10:19:43 +00:00
pid = self.get_process_pid("clickhouse")
if pid is not None:
2021-10-26 14:17:51 +00:00
logging.warning(f"Force kill clickhouse in stop_clickhouse. ps:{pid}")
self.exec_in_container(["bash", "-c", f"gdb -batch -ex 'thread apply all bt full' -p {pid} > {os.path.join(self.path, 'logs/stdout.log')}"], user='root')
self.stop_clickhouse(kill=True)
else:
ps_all = self.exec_in_container(["bash", "-c", "ps aux"], nothrow=True, user='root')
logging.warning(f"We want force stop clickhouse, but no clickhouse-server is running\n{ps_all}")
return
2021-04-07 12:22:53 +00:00
except Exception as e:
logging.warning(f"Stop ClickHouse raised an error {e}")
2021-10-28 07:27:27 +00:00
def start_clickhouse(self, start_wait_sec=60):
if not self.stay_alive:
raise Exception("ClickHouse can be started again only with stay_alive=True instance")
2021-11-11 07:41:48 +00:00
start_time = time.time()
time_to_sleep = 0.5
2021-11-11 07:41:48 +00:00
while start_time + start_wait_sec >= time.time():
# sometimes after SIGKILL (hard reset) server may refuse to start for some time
2021-10-26 08:29:22 +00:00
# for different reasons.
2021-11-11 07:41:48 +00:00
pid = self.get_process_pid("clickhouse")
if pid is None:
logging.debug("No clickhouse process running. Start new one.")
self.exec_in_container(["bash", "-c", "{} --daemon".format(self.clickhouse_start_command)], user=str(os.getuid()))
time.sleep(1)
continue
else:
logging.debug("Clickhouse process running.")
try:
2021-11-11 07:41:48 +00:00
self.wait_start(start_wait_sec + start_time - time.time())
return
except Exception as e:
logging.warning(f"Current start attempt failed. Will kill {pid} just in case.")
self.exec_in_container(["bash", "-c", f"kill -9 {pid}"], user='root', nothrow=True)
2021-11-11 07:41:48 +00:00
time.sleep(time_to_sleep)
2021-11-11 07:41:48 +00:00
raise Exception("Cannot start ClickHouse, see additional info in logs")
def wait_start(self, start_wait_sec):
2021-10-26 14:17:51 +00:00
start_time = time.time()
last_err = None
while time.time() <= start_time + start_wait_sec:
try:
pid = self.get_process_pid("clickhouse")
if pid is None:
raise Exception("ClickHouse server is not running. Check logs.")
2021-11-11 07:41:48 +00:00
exec_query_with_retry(self, 'select 20', retry_count = 10, silent=True)
return
2021-10-26 14:17:51 +00:00
except QueryRuntimeException as err:
last_err = err
pid = self.get_process_pid("clickhouse")
if pid is not None:
logging.warning(f"ERROR {err}")
else:
raise Exception("ClickHouse server is not running. Check logs.")
logging.error(f"No time left to start. But process is still running. Will dump threads.")
2021-11-11 07:41:48 +00:00
ps_clickhouse = self.exec_in_container(["bash", "-c", "ps -C clickhouse"], nothrow=True, user='root')
logging.info(f"PS RESULT:\n{ps_clickhouse}")
2021-10-26 14:17:51 +00:00
pid = self.get_process_pid("clickhouse")
if pid is not None:
self.exec_in_container(["bash", "-c", f"gdb -batch -ex 'thread apply all bt full' -p {pid}"], user='root')
if last_err is not None:
raise last_err
def restart_clickhouse(self, stop_start_wait_sec=60, kill=False):
self.stop_clickhouse(stop_start_wait_sec, kill)
self.start_clickhouse(stop_start_wait_sec)
def exec_in_container(self, cmd, detach=False, nothrow=False, **kwargs):
2021-06-11 12:00:40 +00:00
return self.cluster.exec_in_container(self.docker_id, cmd, detach, nothrow, **kwargs)
2021-10-06 13:08:25 +00:00
def rotate_logs(self):
2021-10-07 21:42:12 +00:00
self.exec_in_container(["bash", "-c", f"kill -HUP {self.get_process_pid('clickhouse server')}"], user='root')
2021-10-06 13:08:25 +00:00
2021-10-19 10:19:43 +00:00
def contains_in_log(self, substring, from_host=False, filename='clickhouse-server.log'):
if from_host:
2021-10-19 10:19:43 +00:00
# We check fist file exists but want to look for all rotated logs as well
result = subprocess_check_call(["bash", "-c",
2021-10-19 10:19:43 +00:00
f'[ -f {self.logs_dir}/{filename} ] && zgrep -aH "{substring}" {self.logs_dir}/{filename}* || true'
])
else:
result = self.exec_in_container(["bash", "-c",
2021-10-19 10:19:43 +00:00
f'[ -f /var/log/clickhouse-server/{filename} ] && zgrep -aH "{substring}" /var/log/clickhouse-server/{filename} || true'
])
2019-03-29 18:10:03 +00:00
return len(result) > 0
2021-10-19 10:19:43 +00:00
def grep_in_log(self, substring, from_host=False, filename='clickhouse-server.log'):
logging.debug(f"grep in log called %s", substring)
if from_host:
2021-10-19 10:19:43 +00:00
# We check fist file exists but want to look for all rotated logs as well
result = subprocess_check_call(["bash", "-c",
2021-10-19 10:19:43 +00:00
f'[ -f {self.logs_dir}/{filename} ] && zgrep -a "{substring}" {self.logs_dir}/{filename}* || true'
])
else:
result = self.exec_in_container(["bash", "-c",
2021-10-19 10:19:43 +00:00
f'[ -f /var/log/clickhouse-server/{filename} ] && zgrep -a "{substring}" /var/log/clickhouse-server/{filename}* || true'
])
logging.debug("grep result %s", result)
2021-03-03 08:09:44 +00:00
return result
2021-03-19 16:12:33 +00:00
def count_in_log(self, substring):
result = self.exec_in_container(
["bash", "-c", 'grep -a "{}" /var/log/clickhouse-server/clickhouse-server.log | wc -l'.format(substring)])
2021-03-19 20:57:00 +00:00
return result
2021-03-19 16:12:33 +00:00
def wait_for_log_line(self, regexp, filename='/var/log/clickhouse-server/clickhouse-server.log', timeout=30, repetitions=1, look_behind_lines=100):
start_time = time.time()
result = self.exec_in_container(
2021-02-24 15:08:58 +00:00
["bash", "-c", 'timeout {} tail -Fn{} "{}" | grep -Em {} {}'.format(timeout, look_behind_lines, filename, repetitions, shlex.quote(regexp))])
# if repetitions>1 grep will return success even if not enough lines were collected,
if repetitions>1 and len(result.splitlines()) < repetitions:
logging.debug("wait_for_log_line: those lines were found during {} seconds:".format(timeout))
logging.debug(result)
2021-02-24 15:08:58 +00:00
raise Exception("wait_for_log_line: Not enough repetitions: {} found, while {} expected".format(len(result.splitlines()), repetitions))
wait_duration = time.time() - start_time
2021-06-03 18:38:12 +00:00
logging.debug('{} log line(s) matching "{}" appeared in a {:.3f} seconds'.format(repetitions, regexp, wait_duration))
2021-03-04 09:07:59 +00:00
return wait_duration
2020-11-17 14:36:04 +00:00
def file_exists(self, path):
return self.exec_in_container(
["bash", "-c", "echo $(if [ -e '{}' ]; then echo 'yes'; else echo 'no'; fi)".format(path)]) == 'yes\n'
2019-02-21 17:34:19 +00:00
def copy_file_to_container(self, local_path, dest_path):
2021-06-11 12:00:40 +00:00
return self.cluster.copy_file_to_container(self.docker_id, local_path, dest_path)
2019-02-21 17:34:19 +00:00
2019-06-21 08:03:13 +00:00
def get_process_pid(self, process_name):
output = self.exec_in_container(["bash", "-c",
2021-11-11 07:41:48 +00:00
"ps ax | grep '{}' | grep -v 'grep' | grep -v 'coproc' | grep -v 'bash -c' | awk '{{print $1}}'".format(
process_name)])
2019-06-21 08:03:13 +00:00
if output:
try:
pid = int(output.split('\n')[0].strip())
return pid
except:
return None
return None
def restart_with_original_version(self, stop_start_wait_sec=300, callback_onstop=None, signal=15):
2021-11-11 07:41:48 +00:00
begin_time = time.time()
if not self.stay_alive:
raise Exception("Cannot restart not stay alive container")
self.exec_in_container(["bash", "-c", "pkill -{} clickhouse".format(signal)], user='root')
retries = int(stop_start_wait_sec / 0.5)
local_counter = 0
# wait stop
while local_counter < retries:
if not self.get_process_pid("clickhouse server"):
break
time.sleep(0.5)
local_counter += 1
# force kill if server hangs
if self.get_process_pid("clickhouse server"):
# server can die before kill, so don't throw exception, it's expected
self.exec_in_container(["bash", "-c", "pkill -{} clickhouse".format(9)], nothrow=True, user='root')
if callback_onstop:
callback_onstop(self)
2021-11-11 07:41:48 +00:00
self.exec_in_container(["bash", "-c", "echo 'restart_with_original_version: From version' && /usr/bin/clickhouse server --version && echo 'To version' && /usr/share/clickhouse_original server --version"])
self.exec_in_container(
["bash", "-c", "cp /usr/share/clickhouse_original /usr/bin/clickhouse && chmod 777 /usr/bin/clickhouse"],
user='root')
self.exec_in_container(["bash", "-c",
"cp /usr/share/clickhouse-odbc-bridge_fresh /usr/bin/clickhouse-odbc-bridge && chmod 777 /usr/bin/clickhouse"],
user='root')
self.exec_in_container(["bash", "-c", "{} --daemon".format(self.clickhouse_start_command)], user=str(os.getuid()))
# wait start
2021-11-11 07:41:48 +00:00
time_left = begin_time + stop_start_wait_sec - time.time()
if time_left <= 0:
raise Exception(f"No time left during restart")
else:
self.wait_start(time_left)
2021-06-16 11:15:08 +00:00
def restart_with_latest_version(self, stop_start_wait_sec=300, callback_onstop=None, signal=15):
2021-11-11 07:41:48 +00:00
begin_time = time.time()
if not self.stay_alive:
raise Exception("Cannot restart not stay alive container")
self.exec_in_container(["bash", "-c", "pkill -{} clickhouse".format(signal)], user='root')
2019-06-21 08:03:13 +00:00
retries = int(stop_start_wait_sec / 0.5)
local_counter = 0
# wait stop
while local_counter < retries:
if not self.get_process_pid("clickhouse server"):
break
time.sleep(0.5)
local_counter += 1
2019-12-17 18:07:13 +00:00
# force kill if server hangs
if self.get_process_pid("clickhouse server"):
# server can die before kill, so don't throw exception, it's expected
self.exec_in_container(["bash", "-c", "pkill -{} clickhouse".format(9)], nothrow=True, user='root')
2019-12-17 18:07:13 +00:00
if callback_onstop:
callback_onstop(self)
self.exec_in_container(
["bash", "-c", "cp /usr/bin/clickhouse /usr/share/clickhouse_original"],
user='root')
self.exec_in_container(
["bash", "-c", "cp /usr/share/clickhouse_fresh /usr/bin/clickhouse && chmod 777 /usr/bin/clickhouse"],
user='root')
2021-11-11 07:41:48 +00:00
self.exec_in_container(["bash", "-c", "echo 'restart_with_latest_version: From version' && /usr/share/clickhouse_original server --version && echo 'To version' /usr/share/clickhouse_fresh server --version"])
self.exec_in_container(["bash", "-c",
"cp /usr/share/clickhouse-odbc-bridge_fresh /usr/bin/clickhouse-odbc-bridge && chmod 777 /usr/bin/clickhouse"],
user='root')
2021-05-21 18:56:22 +00:00
self.exec_in_container(["bash", "-c", "{} --daemon".format(self.clickhouse_start_command)], user=str(os.getuid()))
2021-02-24 11:46:58 +00:00
2019-06-21 08:03:13 +00:00
# wait start
2021-11-11 07:41:48 +00:00
time_left = begin_time + stop_start_wait_sec - time.time()
if time_left <= 0:
raise Exception(f"No time left during restart")
else:
self.wait_start(time_left)
def get_docker_handle(self):
2021-04-13 14:55:31 +00:00
return self.cluster.get_docker_handle(self.docker_id)
def stop(self):
2019-06-04 20:59:31 +00:00
self.get_docker_handle().stop()
def start(self):
self.get_docker_handle().start()
def wait_for_start(self, start_timeout=None, connection_timeout=None):
2021-06-16 12:31:19 +00:00
handle = self.get_docker_handle()
if start_timeout is None or start_timeout <= 0:
raise Exception("Invalid timeout: {}".format(start_timeout))
if connection_timeout is not None and connection_timeout < start_timeout:
raise Exception("Connection timeout {} should be grater then start timeout {}"
.format(connection_timeout, start_timeout))
start_time = time.time()
prev_rows_in_log = 0
def has_new_rows_in_log():
nonlocal prev_rows_in_log
try:
rows_in_log = int(self.count_in_log(".*").strip())
res = rows_in_log > prev_rows_in_log
prev_rows_in_log = rows_in_log
return res
except ValueError:
return False
while True:
2021-06-16 12:31:19 +00:00
handle.reload()
status = handle.status
if status == 'exited':
2021-06-16 12:31:19 +00:00
raise Exception(f"Instance `{self.name}' failed to start. Container status: {status}, logs: {handle.logs().decode('utf-8')}")
deadline = start_time + start_timeout
# It is possible that server starts slowly.
# If container is running, and there is some progress in log, check connection_timeout.
if connection_timeout and status == 'running' and has_new_rows_in_log():
deadline = start_time + connection_timeout
current_time = time.time()
if current_time >= deadline:
2021-06-16 12:31:19 +00:00
raise Exception(f"Timed out while waiting for instance `{self.name}' with ip address {self.ip_address} to start. " \
f"Container status: {status}, logs: {handle.logs().decode('utf-8')}")
socket_timeout = min(start_timeout, deadline - current_time)
# Repeatedly poll the instance address until there is something that listens there.
# Usually it means that ClickHouse is ready to accept queries.
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(socket_timeout)
sock.connect((self.ip_address, 9000))
2021-03-05 13:39:51 +00:00
self.is_up = True
return
except socket.timeout:
continue
except socket.error as e:
if e.errno == errno.ECONNREFUSED or e.errno == errno.EHOSTUNREACH or e.errno == errno.ENETUNREACH:
time.sleep(0.1)
else:
raise
finally:
sock.close()
2021-09-22 15:00:08 +00:00
def dict_to_xml(self, dictionary):
xml_str = dict2xml(dictionary, wrap=self.config_root_name, indent=" ", newlines=True)
2021-01-27 09:50:11 +00:00
return xml_str
2018-08-22 15:42:27 +00:00
@property
def odbc_drivers(self):
if self.odbc_ini_path:
return {
"SQLite3": {
"DSN": "sqlite3_odbc",
"Database": "/tmp/sqliteodbc",
2018-08-22 15:42:27 +00:00
"Driver": "/usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so",
"Setup": "/usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so",
},
"MySQL": {
"DSN": "mysql_odbc",
"Driver": "/usr/lib/x86_64-linux-gnu/odbc/libmyodbc.so",
"Database": "clickhouse",
"Uid": "root",
"Pwd": "clickhouse",
2021-02-15 09:35:45 +00:00
"Server": self.cluster.mysql_host,
2018-08-22 15:42:27 +00:00
},
"PostgreSQL": {
"DSN": "postgresql_odbc",
"Database": "postgres",
"UserName": "postgres",
"Password": "mysecretpassword",
2021-04-13 10:52:22 +00:00
"Port": str(self.cluster.postgres_port),
2021-04-13 11:27:52 +00:00
"Servername": self.cluster.postgres_host,
"Protocol": "9.3",
"ReadOnly": "No",
"RowVersioning": "No",
"ShowSystemTables": "No",
2018-08-22 15:42:27 +00:00
"Driver": "/usr/lib/x86_64-linux-gnu/odbc/psqlodbca.so",
"Setup": "/usr/lib/x86_64-linux-gnu/odbc/libodbcpsqlS.so",
"ConnSettings": "",
2018-08-22 15:42:27 +00:00
}
}
else:
return {}
def _create_odbc_config_file(self):
with open(self.odbc_ini_path.split(':')[0], 'w') as f:
2020-10-02 16:54:07 +00:00
for driver_setup in list(self.odbc_drivers.values()):
2018-08-22 15:42:27 +00:00
f.write("[{}]\n".format(driver_setup["DSN"]))
2020-10-02 16:54:07 +00:00
for key, value in list(driver_setup.items()):
2018-08-22 15:42:27 +00:00
if key != "DSN":
f.write(key + "=" + value + "\n")
def replace_config(self, path_to_config, replacement):
self.exec_in_container(["bash", "-c", "echo '{}' > {}".format(replacement, path_to_config)])
def replace_in_config(self, path_to_config, replace, replacement):
self.exec_in_container(["bash", "-c", f"sed -i 's/{replace}/{replacement}/g' {path_to_config}"])
def create_dir(self, destroy_dir=True):
"""Create the instance directory and all the needed files there."""
if destroy_dir:
self.destroy_dir()
elif p.exists(self.path):
return
os.makedirs(self.path)
instance_config_dir = p.abspath(p.join(self.path, 'configs'))
os.makedirs(instance_config_dir)
print(f"Copy common default production configuration from {self.base_config_dir}. Files: {self.main_config_name}, {self.users_config_name}")
2021-05-21 18:56:22 +00:00
shutil.copyfile(p.join(self.base_config_dir, self.main_config_name), p.join(instance_config_dir, self.main_config_name))
shutil.copyfile(p.join(self.base_config_dir, self.users_config_name), p.join(instance_config_dir, self.users_config_name))
2021-02-18 21:21:50 +00:00
logging.debug("Create directory for configuration generated in this helper")
2018-09-28 14:53:20 +00:00
# used by all utils with any config
conf_d_dir = p.abspath(p.join(instance_config_dir, 'conf.d'))
2018-09-28 14:53:20 +00:00
os.mkdir(conf_d_dir)
2021-02-18 21:21:50 +00:00
logging.debug("Create directory for common tests configuration")
# used by server with main config.xml
self.config_d_dir = p.abspath(p.join(instance_config_dir, 'config.d'))
os.mkdir(self.config_d_dir)
users_d_dir = p.abspath(p.join(instance_config_dir, 'users.d'))
2017-07-11 11:44:16 +00:00
os.mkdir(users_d_dir)
dictionaries_dir = p.abspath(p.join(instance_config_dir, 'dictionaries'))
os.mkdir(dictionaries_dir)
2021-10-07 19:05:51 +00:00
def write_embedded_config(name, dest_dir, fix_log_level=False):
2021-09-22 15:00:08 +00:00
with open(p.join(HELPERS_DIR, name), 'r') as f:
data = f.read()
data = data.replace('clickhouse', self.config_root_name)
2021-10-07 19:05:51 +00:00
if fix_log_level:
data = data.replace('<level>test</level>', '<level>trace</level>')
2021-09-22 15:00:08 +00:00
with open(p.join(dest_dir, name), 'w') as r:
r.write(data)
2021-05-11 13:13:26 +00:00
2021-02-18 21:21:50 +00:00
logging.debug("Copy common configuration from helpers")
# The file is named with 0_ prefix to be processed before other configuration overloads.
2021-05-21 18:56:22 +00:00
if self.copy_common_configs:
2021-10-07 19:05:51 +00:00
need_fix_log_level = self.tag != 'latest'
write_embedded_config('0_common_instance_config.xml', self.config_d_dir, need_fix_log_level)
2021-09-22 15:00:08 +00:00
write_embedded_config('0_common_instance_users.xml', users_d_dir)
2021-05-21 18:56:22 +00:00
if len(self.custom_dictionaries_paths):
2021-09-22 15:00:08 +00:00
write_embedded_config('0_common_enable_dictionaries.xml', self.config_d_dir)
2021-02-18 21:21:50 +00:00
logging.debug("Generate and write macros file")
macros = self.macros.copy()
macros['instance'] = self.name
with open(p.join(conf_d_dir, 'macros.xml'), 'w') as macros_config:
macros_config.write(self.dict_to_xml({"macros": macros}))
# Put ZooKeeper config
if self.with_zookeeper:
2018-09-28 14:53:20 +00:00
shutil.copy(self.zookeeper_config_path, conf_d_dir)
2020-09-10 10:02:46 +00:00
if self.with_kerberized_kafka or self.with_kerberized_hdfs:
shutil.copytree(self.kerberos_secrets_dir, p.abspath(p.join(self.path, 'secrets')))
# Copy config.d configs
2021-02-24 11:46:58 +00:00
logging.debug(f"Copy custom test config files {self.custom_main_config_paths} to {self.config_d_dir}")
for path in self.custom_main_config_paths:
shutil.copy(path, self.config_d_dir)
# Copy users.d configs
for path in self.custom_user_config_paths:
shutil.copy(path, users_d_dir)
# Copy dictionaries configs to configs/dictionaries
for path in self.custom_dictionaries_paths:
shutil.copy(path, dictionaries_dir)
db_dir = p.abspath(p.join(self.path, 'database'))
2021-02-24 11:46:58 +00:00
logging.debug(f"Setup database dir {db_dir}")
if self.clickhouse_path_dir is not None:
2021-02-24 11:46:58 +00:00
logging.debug(f"Database files taken from {self.clickhouse_path_dir}")
shutil.copytree(self.clickhouse_path_dir, db_dir)
2021-02-24 11:46:58 +00:00
logging.debug(f"Database copied from {self.clickhouse_path_dir} to {db_dir}")
2020-09-14 07:01:20 +00:00
else:
os.mkdir(db_dir)
logs_dir = p.abspath(p.join(self.path, 'logs'))
2021-02-24 11:46:58 +00:00
logging.debug(f"Setup logs dir {logs_dir}")
os.mkdir(logs_dir)
self.logs_dir = logs_dir
2018-05-14 11:10:07 +00:00
depends_on = []
2021-04-13 14:55:31 +00:00
if self.with_mysql_client:
depends_on.append(self.cluster.mysql_client_host)
2018-05-14 11:10:07 +00:00
if self.with_mysql:
2021-02-15 09:35:45 +00:00
depends_on.append("mysql57")
2018-05-14 11:10:07 +00:00
2021-02-16 07:10:01 +00:00
if self.with_mysql8:
depends_on.append("mysql80")
2021-04-13 14:55:31 +00:00
if self.with_mysql_cluster:
depends_on.append("mysql57")
depends_on.append("mysql2")
depends_on.append("mysql3")
depends_on.append("mysql4")
if self.with_postgres_cluster:
depends_on.append("postgres2")
depends_on.append("postgres3")
depends_on.append("postgres4")
2021-06-13 12:56:22 +00:00
if self.with_kafka:
depends_on.append("kafka1")
2020-02-03 00:02:19 +00:00
depends_on.append("schema-registry")
if self.with_kerberized_kafka:
depends_on.append("kerberized_kafka1")
2020-09-10 10:02:46 +00:00
if self.with_kerberized_hdfs:
depends_on.append("kerberizedhdfs1")
if self.with_rabbitmq:
depends_on.append("rabbitmq1")
if self.with_zookeeper:
2018-05-14 11:10:07 +00:00
depends_on.append("zoo1")
depends_on.append("zoo2")
depends_on.append("zoo3")
if self.with_minio:
depends_on.append("minio1")
2021-02-16 07:10:01 +00:00
self.cluster.env_variables.update(self.env_variables)
2018-08-22 15:42:27 +00:00
odbc_ini_path = ""
if self.odbc_ini_path:
self._create_odbc_config_file()
odbc_ini_path = '- ' + self.odbc_ini_path
2021-05-21 18:56:22 +00:00
entrypoint_cmd = self.clickhouse_start_command
if self.stay_alive:
2021-05-21 18:56:22 +00:00
entrypoint_cmd = CLICKHOUSE_STAY_ALIVE_COMMAND.replace("{main_config_file}", self.main_config_name)
2021-09-26 15:06:32 +00:00
else:
entrypoint_cmd = '[' + ', '.join(map(lambda x: '"' + x + '"', entrypoint_cmd.split())) + ']'
2021-02-18 21:21:50 +00:00
logging.debug("Entrypoint cmd: {}".format(entrypoint_cmd))
networks = app_net = ipv4_address = ipv6_address = net_aliases = net_alias1 = ""
if self.ipv4_address is not None or self.ipv6_address is not None or self.hostname != self.name:
networks = "networks:"
2019-06-04 20:59:31 +00:00
app_net = "default:"
if self.ipv4_address is not None:
ipv4_address = "ipv4_address: " + self.ipv4_address
if self.ipv6_address is not None:
ipv6_address = "ipv6_address: " + self.ipv6_address
if self.hostname != self.name:
net_aliases = "aliases:"
net_alias1 = "- " + self.hostname
if not self.with_installed_binary:
binary_volume = "- " + self.server_bin_path + ":/usr/bin/clickhouse"
odbc_bridge_volume = "- " + self.odbc_bridge_bin_path + ":/usr/bin/clickhouse-odbc-bridge"
2021-03-11 17:48:47 +00:00
library_bridge_volume = "- " + self.library_bridge_bin_path + ":/usr/bin/clickhouse-library-bridge"
else:
binary_volume = "- " + self.server_bin_path + ":/usr/share/clickhouse_fresh"
odbc_bridge_volume = "- " + self.odbc_bridge_bin_path + ":/usr/share/clickhouse-odbc-bridge_fresh"
2021-03-11 17:48:47 +00:00
library_bridge_volume = "- " + self.library_bridge_bin_path + ":/usr/share/clickhouse-library-bridge_fresh"
external_dirs_volumes = ""
if self.external_dirs:
for external_dir in self.external_dirs:
external_dir_abs_path = p.abspath(p.join(self.path, external_dir.lstrip('/')))
logging.info(f'external_dir_abs_path={external_dir_abs_path}')
os.mkdir(external_dir_abs_path)
external_dirs_volumes += "- " + external_dir_abs_path + ":" + external_dir + "\n"
2021-04-20 13:31:24 +00:00
with open(self.docker_compose_path, 'w') as docker_compose:
docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format(
2018-08-22 15:42:27 +00:00
image=self.image,
2020-09-01 06:38:23 +00:00
tag=self.tag,
name=self.name,
hostname=self.hostname,
binary_volume=binary_volume,
odbc_bridge_volume=odbc_bridge_volume,
2021-03-11 17:48:47 +00:00
library_bridge_volume=library_bridge_volume,
instance_config_dir=instance_config_dir,
config_d_dir=self.config_d_dir,
2018-05-14 11:14:49 +00:00
db_dir=db_dir,
external_dirs_volumes=external_dirs_volumes,
tmpfs=str(self.tmpfs),
logs_dir=logs_dir,
depends_on=str(depends_on),
2019-02-21 17:34:19 +00:00
user=os.getuid(),
2021-02-12 15:51:21 +00:00
env_file=self.env_file,
2018-08-22 15:42:27 +00:00
odbc_ini_path=odbc_ini_path,
keytab_path=self.keytab_path,
krb5_conf=self.krb5_conf,
entrypoint_cmd=entrypoint_cmd,
networks=networks,
app_net=app_net,
ipv4_address=ipv4_address,
ipv6_address=ipv6_address,
net_aliases=net_aliases,
net_alias1=net_alias1,
2018-08-22 15:42:27 +00:00
))
def destroy_dir(self):
if p.exists(self.path):
shutil.rmtree(self.path)
def get_backuped_s3_objects(self, disk, backup_name):
command = ['find', f'/var/lib/clickhouse/disks/{disk}/shadow/{backup_name}/store', '-type', 'f',
'-exec', 'grep', '-o', 'r[01]\\{64\\}-file-[[:lower:]]\\{32\\}', '{}', ';']
return self.exec_in_container(command).split('\n')
class ClickHouseKiller(object):
def __init__(self, clickhouse_node):
self.clickhouse_node = clickhouse_node
def __enter__(self):
self.clickhouse_node.stop_clickhouse(kill=True)
def __exit__(self, exc_type, exc_val, exc_tb):
self.clickhouse_node.start_clickhouse()