2019-11-20 11:56:38 +00:00
import base64
import errno
2020-10-02 16:54:07 +00:00
import http . client
2020-08-12 08:55:04 +00:00
import logging
2017-05-19 18:54:05 +00:00
import os
import os . path as p
2020-08-12 08:55:04 +00:00
import pprint
2017-05-23 14:24:04 +00:00
import pwd
2017-05-19 18:54:05 +00:00
import re
import shutil
import socket
2019-11-20 11:56:38 +00:00
import subprocess
2017-05-19 18:54:05 +00:00
import time
2020-09-09 11:11:59 +00:00
import traceback
2020-10-02 16:54:07 +00:00
import urllib . parse
2020-09-16 04:26:10 +00:00
import cassandra . cluster
import docker
import psycopg2
import pymongo
import pymysql
import requests
2021-01-27 09:50:11 +00:00
from dict2xml import dict2xml
2020-10-02 16:54:07 +00:00
from confluent_kafka . avro . cached_schema_registry_client import CachedSchemaRegistryClient
2019-11-20 11:56:38 +00:00
from kazoo . client import KazooClient
from kazoo . exceptions import KazooException
from minio import Minio
2017-05-19 18:54:05 +00:00
2019-11-20 11:56:38 +00:00
from . client import Client
2018-12-05 13:24:45 +00:00
from . hdfs_api import HDFSApi
2017-05-19 18:54:05 +00:00
HELPERS_DIR = p . dirname ( __file__ )
2020-04-17 16:01:16 +00:00
CLICKHOUSE_ROOT_DIR = p . join ( p . dirname ( __file__ ) , " ../../.. " )
2020-07-06 13:45:54 +00:00
LOCAL_DOCKER_COMPOSE_DIR = p . join ( CLICKHOUSE_ROOT_DIR , " docker/test/integration/runner/compose/ " )
2018-07-28 14:38:08 +00:00
DEFAULT_ENV_NAME = ' env_file '
2017-05-19 18:54:05 +00:00
2019-11-14 16:00:02 +00:00
SANITIZER_SIGN = " ================== "
2019-11-20 11:56:38 +00:00
2018-07-28 14:38:08 +00:00
def _create_env_file ( path , variables , fname = DEFAULT_ENV_NAME ) :
full_path = os . path . join ( path , fname )
with open ( full_path , ' w ' ) as f :
2020-10-02 16:54:07 +00:00
for var , value in list ( variables . items ( ) ) :
2018-07-28 14:38:08 +00:00
f . write ( " = " . join ( [ var , value ] ) + " \n " )
return full_path
2021-01-22 14:27:23 +00:00
def run_and_check ( args , env = None , shell = False ) :
res = subprocess . run ( args , stdout = subprocess . PIPE , stderr = subprocess . PIPE , env = env , shell = shell )
if res . returncode != 0 :
# check_call(...) from subprocess does not print stderr, so we do it manually
2021-02-02 09:08:47 +00:00
print ( ' Stderr: \n {} \n ' . format ( res . stderr . decode ( ' utf-8 ' ) ) )
print ( ' Stdout: \n {} \n ' . format ( res . stdout . decode ( ' utf-8 ' ) ) )
raise Exception ( ' Command {} return non-zero code {} : {} ' . format ( args , res . returncode , res . stderr . decode ( ' utf-8 ' ) ) )
2021-01-22 14:27:23 +00:00
2018-09-28 14:53:20 +00:00
def subprocess_check_call ( args ) :
# Uncomment for debugging
# print('run:', ' ' . join(args))
2021-01-22 14:27:23 +00:00
run_and_check ( args )
2018-09-28 14:53:20 +00:00
2019-11-20 11:56:38 +00:00
2018-09-28 14:53:20 +00:00
def subprocess_call ( args ) :
2020-07-06 13:45:54 +00:00
# Uncomment for debugging..;
2018-09-28 14:53:20 +00:00
# print('run:', ' ' . join(args))
subprocess . call ( args )
2019-01-29 17:17:31 +00:00
def get_odbc_bridge_path ( ) :
path = os . environ . get ( ' CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH ' )
if path is None :
server_path = os . environ . get ( ' CLICKHOUSE_TESTS_SERVER_BIN_PATH ' )
if server_path is not None :
return os . path . join ( os . path . dirname ( server_path ) , ' clickhouse-odbc-bridge ' )
else :
return ' /usr/bin/clickhouse-odbc-bridge '
return path
2020-09-16 04:26:10 +00:00
2020-07-06 13:45:54 +00:00
def get_docker_compose_path ( ) :
compose_path = os . environ . get ( ' DOCKER_COMPOSE_DIR ' )
if compose_path is not None :
return os . path . dirname ( compose_path )
else :
if os . path . exists ( os . path . dirname ( ' /compose/ ' ) ) :
2020-09-16 04:26:10 +00:00
return os . path . dirname ( ' /compose/ ' ) # default in docker runner container
2020-07-06 13:45:54 +00:00
else :
2020-10-02 16:54:07 +00:00
print ( ( " Fallback docker_compose_path to LOCAL_DOCKER_COMPOSE_DIR: {} " . format ( LOCAL_DOCKER_COMPOSE_DIR ) ) )
2020-07-06 13:45:54 +00:00
return LOCAL_DOCKER_COMPOSE_DIR
2019-01-29 17:17:31 +00:00
2017-05-19 18:54:05 +00:00
class ClickHouseCluster :
""" ClickHouse cluster with several instances and (possibly) ZooKeeper.
Add instances with several calls to add_instance ( ) , then start them with the start ( ) call .
Directories for instances are created in the directory of base_path . After cluster is started ,
these directories will contain logs , database files , docker - compose config , ClickHouse configs etc .
"""
2020-08-14 15:51:28 +00:00
def __init__ ( self , base_path , name = None , base_config_dir = None , server_bin_path = None , client_bin_path = None ,
2019-01-29 17:17:31 +00:00
odbc_bridge_bin_path = None , zookeeper_config_path = None , custom_dockerd_host = None ) :
2020-10-02 16:54:07 +00:00
for param in list ( os . environ . keys ( ) ) :
print ( " ENV %40s %s " % ( param , os . environ [ param ] ) )
2017-05-19 18:54:05 +00:00
self . base_dir = p . dirname ( base_path )
2017-06-16 16:00:53 +00:00
self . name = name if name is not None else ' '
2017-05-19 18:54:05 +00:00
2020-08-12 08:55:04 +00:00
self . base_config_dir = base_config_dir or os . environ . get ( ' CLICKHOUSE_TESTS_BASE_CONFIG_DIR ' ,
2020-09-16 04:26:10 +00:00
' /etc/clickhouse-server/ ' )
2019-11-20 11:56:38 +00:00
self . server_bin_path = p . realpath (
server_bin_path or os . environ . get ( ' CLICKHOUSE_TESTS_SERVER_BIN_PATH ' , ' /usr/bin/clickhouse ' ) )
2019-01-29 17:17:31 +00:00
self . odbc_bridge_bin_path = p . realpath ( odbc_bridge_bin_path or get_odbc_bridge_path ( ) )
2019-11-20 11:56:38 +00:00
self . client_bin_path = p . realpath (
client_bin_path or os . environ . get ( ' CLICKHOUSE_TESTS_CLIENT_BIN_PATH ' , ' /usr/bin/clickhouse-client ' ) )
self . zookeeper_config_path = p . join ( self . base_dir , zookeeper_config_path ) if zookeeper_config_path else p . join (
HELPERS_DIR , ' zookeeper_config.xml ' )
2017-05-19 18:54:05 +00:00
2021-01-27 16:21:54 +00:00
project_name = pwd . getpwuid ( os . getuid ( ) ) . pw_name + p . basename ( self . base_dir ) + self . name
2017-05-19 18:54:05 +00:00
# docker-compose removes everything non-alphanumeric from project names so we do it too.
2021-01-27 16:21:54 +00:00
self . project_name = re . sub ( r ' [^a-z0-9] ' , ' ' , project_name . lower ( ) )
2017-07-26 12:31:55 +00:00
self . instances_dir = p . join ( self . base_dir , ' _instances ' + ( ' ' if not self . name else ' _ ' + self . name ) )
2019-11-14 16:00:02 +00:00
self . docker_logs_path = p . join ( self . instances_dir , ' docker.log ' )
2017-05-19 18:54:05 +00:00
2018-08-23 15:38:25 +00:00
custom_dockerd_host = custom_dockerd_host or os . environ . get ( ' CLICKHOUSE_TESTS_DOCKERD_HOST ' )
2018-08-24 11:19:06 +00:00
self . docker_api_version = os . environ . get ( " DOCKER_API_VERSION " )
2020-09-03 13:03:26 +00:00
self . docker_base_tag = os . environ . get ( " DOCKER_BASE_TAG " , " latest " )
2018-08-23 15:31:20 +00:00
self . base_cmd = [ ' docker-compose ' ]
if custom_dockerd_host :
self . base_cmd + = [ ' --host ' , custom_dockerd_host ]
2021-01-27 16:21:54 +00:00
self . base_cmd + = [ ' --project-name ' , self . project_name ]
2018-08-23 15:31:20 +00:00
2017-08-30 16:25:34 +00:00
self . base_zookeeper_cmd = None
2018-05-14 11:10:07 +00:00
self . base_mysql_cmd = [ ]
2018-07-18 05:22:01 +00:00
self . base_kafka_cmd = [ ]
2020-09-29 08:56:37 +00:00
self . base_kerberized_kafka_cmd = [ ]
2020-05-20 06:22:12 +00:00
self . base_rabbitmq_cmd = [ ]
2020-05-19 02:21:27 +00:00
self . base_cassandra_cmd = [ ]
2018-01-09 19:12:43 +00:00
self . pre_zookeeper_commands = [ ]
2017-05-19 18:54:05 +00:00
self . instances = { }
self . with_zookeeper = False
2018-05-14 11:10:07 +00:00
self . with_mysql = False
2018-10-15 14:49:23 +00:00
self . with_postgres = False
2018-07-18 05:22:01 +00:00
self . with_kafka = False
2020-09-29 08:56:37 +00:00
self . with_kerberized_kafka = False
2020-05-20 06:22:12 +00:00
self . with_rabbitmq = False
2018-08-22 15:42:27 +00:00
self . with_odbc_drivers = False
2018-12-05 13:24:45 +00:00
self . with_hdfs = False
2020-09-10 10:02:46 +00:00
self . with_kerberized_hdfs = False
2019-02-25 10:45:22 +00:00
self . with_mongo = False
2019-06-05 09:23:41 +00:00
self . with_net_trics = False
2019-03-21 18:10:55 +00:00
self . with_redis = False
2020-05-19 02:21:27 +00:00
self . with_cassandra = False
2018-07-28 14:38:08 +00:00
2019-11-20 11:56:38 +00:00
self . with_minio = False
2020-07-10 19:42:18 +00:00
self . minio_certs_dir = None
2019-11-20 11:56:38 +00:00
self . minio_host = " minio1 "
self . minio_bucket = " root "
2021-01-12 17:18:40 +00:00
self . minio_bucket_2 = " root2 "
2019-11-20 11:56:38 +00:00
self . minio_port = 9001
self . minio_client = None # type: Minio
2020-07-10 19:42:18 +00:00
self . minio_redirect_host = " proxy1 "
self . minio_redirect_port = 8080
2019-11-20 11:56:38 +00:00
2020-02-03 00:02:19 +00:00
# available when with_kafka == True
self . schema_registry_client = None
self . schema_registry_host = " schema-registry "
self . schema_registry_port = 8081
2020-05-19 15:27:10 +00:00
self . zookeeper_use_tmpfs = True
2017-05-23 17:13:36 +00:00
self . docker_client = None
2017-05-19 18:54:05 +00:00
self . is_up = False
2020-10-02 16:54:07 +00:00
print ( " CLUSTER INIT base_config_dir: {} " . format ( self . base_config_dir ) )
2017-05-19 18:54:05 +00:00
2018-09-07 11:51:51 +00:00
def get_client_cmd ( self ) :
cmd = self . client_bin_path
if p . basename ( cmd ) == ' clickhouse ' :
cmd + = " client "
return cmd
2020-10-30 19:40:16 +00:00
def add_instance ( self , name , base_config_dir = None , main_configs = None , user_configs = None , dictionaries = None ,
2020-09-16 04:26:10 +00:00
macros = None ,
2020-09-29 08:56:37 +00:00
with_zookeeper = False , with_mysql = False , with_kafka = False , with_kerberized_kafka = False , with_rabbitmq = False ,
2020-09-16 04:26:10 +00:00
clickhouse_path_dir = None ,
2020-09-10 10:02:46 +00:00
with_odbc_drivers = False , with_postgres = False , with_hdfs = False , with_kerberized_hdfs = False , with_mongo = False ,
2020-05-19 02:21:27 +00:00
with_redis = False , with_minio = False , with_cassandra = False ,
2020-09-01 06:38:23 +00:00
hostname = None , env_variables = None , image = " yandex/clickhouse-integration-test " , tag = None ,
2020-04-12 22:03:44 +00:00
stay_alive = False , ipv4_address = None , ipv6_address = None , with_installed_binary = False , tmpfs = None ,
2020-07-10 19:42:18 +00:00
zookeeper_docker_compose_path = None , zookeeper_use_tmpfs = True , minio_certs_dir = None ) :
2017-05-19 18:54:05 +00:00
""" Add an instance to the cluster.
name - the name of the instance directory and the value of the ' instance ' macro in ClickHouse .
2020-08-12 08:55:04 +00:00
base_config_dir - a directory with config . xml and users . xml files which will be copied to / etc / clickhouse - server / directory
2017-05-30 11:49:17 +00:00
main_configs - a list of config files that will be added to config . d / directory
user_configs - a list of config files that will be added to users . d / directory
2017-05-19 18:54:05 +00:00
with_zookeeper - if True , add ZooKeeper configuration to configs and ZooKeeper instances to the cluster .
"""
if self . is_up :
2017-05-23 17:13:36 +00:00
raise Exception ( " Can \' t add instance %s : cluster is already up! " % name )
2017-05-19 18:54:05 +00:00
if name in self . instances :
2017-05-23 17:13:36 +00:00
raise Exception ( " Can \' t add instance ` %s ' : there is already an instance with the same name! " % name )
2017-05-19 18:54:05 +00:00
2020-09-01 06:38:23 +00:00
if tag is None :
tag = self . docker_base_tag
2020-12-24 15:51:12 +00:00
if not env_variables :
env_variables = { }
# Code coverage files will be placed in database directory
# (affect only WITH_COVERAGE=1 build)
env_variables [ ' LLVM_PROFILE_FILE ' ] = ' /var/lib/clickhouse/server_ % h_ % p_ % m.profraw '
2020-09-01 06:38:23 +00:00
2017-08-02 14:42:35 +00:00
instance = ClickHouseInstance (
2020-09-02 12:28:47 +00:00
cluster = self ,
base_path = self . base_dir ,
name = name ,
base_config_dir = base_config_dir if base_config_dir else self . base_config_dir ,
custom_main_configs = main_configs or [ ] ,
custom_user_configs = user_configs or [ ] ,
custom_dictionaries = dictionaries or [ ] ,
macros = macros or { } ,
with_zookeeper = with_zookeeper ,
zookeeper_config_path = self . zookeeper_config_path ,
with_mysql = with_mysql ,
with_kafka = with_kafka ,
2020-09-29 08:56:37 +00:00
with_kerberized_kafka = with_kerberized_kafka ,
2020-09-02 12:28:47 +00:00
with_rabbitmq = with_rabbitmq ,
2020-09-10 10:02:46 +00:00
with_kerberized_hdfs = with_kerberized_hdfs ,
2020-09-02 12:28:47 +00:00
with_mongo = with_mongo ,
with_redis = with_redis ,
with_minio = with_minio ,
with_cassandra = with_cassandra ,
server_bin_path = self . server_bin_path ,
odbc_bridge_bin_path = self . odbc_bridge_bin_path ,
clickhouse_path_dir = clickhouse_path_dir ,
with_odbc_drivers = with_odbc_drivers ,
hostname = hostname ,
2020-12-24 15:51:12 +00:00
env_variables = env_variables ,
2020-09-02 12:28:47 +00:00
image = image ,
tag = tag ,
stay_alive = stay_alive ,
ipv4_address = ipv4_address ,
2019-11-20 11:56:38 +00:00
ipv6_address = ipv6_address ,
2020-09-02 12:28:47 +00:00
with_installed_binary = with_installed_binary ,
tmpfs = tmpfs or [ ] )
2017-08-02 14:42:35 +00:00
2020-07-06 13:45:54 +00:00
docker_compose_yml_dir = get_docker_compose_path ( )
2017-05-19 18:54:05 +00:00
self . instances [ name ] = instance
2019-06-04 20:59:31 +00:00
if ipv4_address is not None or ipv6_address is not None :
2019-06-05 09:23:41 +00:00
self . with_net_trics = True
2020-07-06 13:45:54 +00:00
self . base_cmd . extend ( [ ' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_net.yml ' ) ] )
2019-06-04 20:59:31 +00:00
2017-05-19 18:54:05 +00:00
self . base_cmd . extend ( [ ' --file ' , instance . docker_compose_path ] )
2019-06-04 20:59:31 +00:00
2019-06-05 09:23:41 +00:00
cmds = [ ]
2017-05-19 18:54:05 +00:00
if with_zookeeper and not self . with_zookeeper :
2020-04-12 22:03:44 +00:00
if not zookeeper_docker_compose_path :
2020-07-06 13:45:54 +00:00
zookeeper_docker_compose_path = p . join ( docker_compose_yml_dir , ' docker_compose_zookeeper.yml ' )
2020-04-12 22:03:44 +00:00
2017-05-19 18:54:05 +00:00
self . with_zookeeper = True
2020-05-19 15:27:10 +00:00
self . zookeeper_use_tmpfs = zookeeper_use_tmpfs
2020-04-12 22:03:44 +00:00
self . base_cmd . extend ( [ ' --file ' , zookeeper_docker_compose_path ] )
2021-01-27 11:26:49 +00:00
self . base_zookeeper_cmd = [ ' docker-compose ' , ' --project-name ' , self . project_name ,
' --file ' , zookeeper_docker_compose_path ]
2019-06-05 09:23:41 +00:00
cmds . append ( self . base_zookeeper_cmd )
2018-07-28 14:38:08 +00:00
2018-05-14 11:10:07 +00:00
if with_mysql and not self . with_mysql :
self . with_mysql = True
2020-07-06 13:45:54 +00:00
self . base_cmd . extend ( [ ' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_mysql.yml ' ) ] )
2021-01-27 11:26:49 +00:00
self . base_mysql_cmd = [ ' docker-compose ' , ' --project-name ' , self . project_name ,
' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_mysql.yml ' ) ]
2017-05-19 18:54:05 +00:00
2019-06-05 09:23:41 +00:00
cmds . append ( self . base_mysql_cmd )
2018-10-15 14:49:23 +00:00
if with_postgres and not self . with_postgres :
self . with_postgres = True
2020-07-06 13:45:54 +00:00
self . base_cmd . extend ( [ ' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_postgres.yml ' ) ] )
2021-01-27 11:26:49 +00:00
self . base_postgres_cmd = [ ' docker-compose ' , ' --project-name ' , self . project_name ,
' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_postgres.yml ' ) ]
2019-06-05 09:23:41 +00:00
cmds . append ( self . base_postgres_cmd )
2018-10-15 14:49:23 +00:00
2018-08-22 15:42:27 +00:00
if with_odbc_drivers and not self . with_odbc_drivers :
self . with_odbc_drivers = True
if not self . with_mysql :
self . with_mysql = True
2020-07-06 13:45:54 +00:00
self . base_cmd . extend ( [ ' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_mysql.yml ' ) ] )
2021-01-27 11:26:49 +00:00
self . base_mysql_cmd = [ ' docker-compose ' , ' --project-name ' , self . project_name ,
' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_mysql.yml ' ) ]
2019-06-05 09:23:41 +00:00
cmds . append ( self . base_mysql_cmd )
2018-10-15 14:49:23 +00:00
if not self . with_postgres :
self . with_postgres = True
2020-07-06 13:45:54 +00:00
self . base_cmd . extend ( [ ' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_postgres.yml ' ) ] )
2021-01-27 11:26:49 +00:00
self . base_postgres_cmd = [ ' docker-compose ' , ' --project-name ' , self . project_name ,
' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_postgres.yml ' ) ]
2019-06-05 09:23:41 +00:00
cmds . append ( self . base_postgres_cmd )
2018-10-15 14:49:23 +00:00
2018-07-18 05:22:01 +00:00
if with_kafka and not self . with_kafka :
self . with_kafka = True
2020-07-06 13:45:54 +00:00
self . base_cmd . extend ( [ ' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_kafka.yml ' ) ] )
2021-01-27 11:26:49 +00:00
self . base_kafka_cmd = [ ' docker-compose ' , ' --project-name ' , self . project_name ,
' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_kafka.yml ' ) ]
2019-06-05 09:23:41 +00:00
cmds . append ( self . base_kafka_cmd )
2018-07-18 05:22:01 +00:00
2020-09-29 08:56:37 +00:00
if with_kerberized_kafka and not self . with_kerberized_kafka :
self . with_kerberized_kafka = True
self . base_cmd . extend ( [ ' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_kerberized_kafka.yml ' ) ] )
2021-01-27 11:26:49 +00:00
self . base_kerberized_kafka_cmd = [ ' docker-compose ' , ' --project-name ' , self . project_name ,
' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_kerberized_kafka.yml ' ) ]
2020-09-29 08:56:37 +00:00
cmds . append ( self . base_kerberized_kafka_cmd )
2020-05-20 06:22:12 +00:00
if with_rabbitmq and not self . with_rabbitmq :
self . with_rabbitmq = True
2020-07-06 13:45:54 +00:00
self . base_cmd . extend ( [ ' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_rabbitmq.yml ' ) ] )
2021-01-27 11:26:49 +00:00
self . base_rabbitmq_cmd = [ ' docker-compose ' , ' --project-name ' , self . project_name ,
' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_rabbitmq.yml ' ) ]
2020-05-20 06:22:12 +00:00
cmds . append ( self . base_rabbitmq_cmd )
2018-12-05 13:24:45 +00:00
if with_hdfs and not self . with_hdfs :
self . with_hdfs = True
2020-07-06 13:45:54 +00:00
self . base_cmd . extend ( [ ' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_hdfs.yml ' ) ] )
2021-01-27 11:26:49 +00:00
self . base_hdfs_cmd = [ ' docker-compose ' , ' --project-name ' , self . project_name ,
' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_hdfs.yml ' ) ]
2019-06-05 09:23:41 +00:00
cmds . append ( self . base_hdfs_cmd )
2018-12-05 13:24:45 +00:00
2020-09-10 10:02:46 +00:00
if with_kerberized_hdfs and not self . with_kerberized_hdfs :
self . with_kerberized_hdfs = True
self . base_cmd . extend ( [ ' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_kerberized_hdfs.yml ' ) ] )
2021-01-27 11:26:49 +00:00
self . base_kerberized_hdfs_cmd = [ ' docker-compose ' , ' --project-name ' , self . project_name ,
' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_kerberized_hdfs.yml ' ) ]
2020-09-10 10:02:46 +00:00
cmds . append ( self . base_kerberized_hdfs_cmd )
2019-02-25 10:45:22 +00:00
if with_mongo and not self . with_mongo :
self . with_mongo = True
2020-07-06 13:45:54 +00:00
self . base_cmd . extend ( [ ' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_mongo.yml ' ) ] )
2021-01-27 11:26:49 +00:00
self . base_mongo_cmd = [ ' docker-compose ' , ' --project-name ' , self . project_name ,
' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_mongo.yml ' ) ]
2019-06-05 09:23:41 +00:00
cmds . append ( self . base_mongo_cmd )
if self . with_net_trics :
for cmd in cmds :
2020-07-06 13:45:54 +00:00
cmd . extend ( [ ' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_net.yml ' ) ] )
2018-12-05 13:24:45 +00:00
2019-03-21 18:10:55 +00:00
if with_redis and not self . with_redis :
self . with_redis = True
2020-07-06 13:45:54 +00:00
self . base_cmd . extend ( [ ' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_redis.yml ' ) ] )
2021-01-27 11:26:49 +00:00
self . base_redis_cmd = [ ' docker-compose ' , ' --project-name ' , self . project_name ,
' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_redis.yml ' ) ]
2019-03-21 18:10:55 +00:00
2019-11-20 11:56:38 +00:00
if with_minio and not self . with_minio :
self . with_minio = True
2020-07-10 19:42:18 +00:00
self . minio_certs_dir = minio_certs_dir
2020-07-06 13:45:54 +00:00
self . base_cmd . extend ( [ ' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_minio.yml ' ) ] )
2021-01-27 11:26:49 +00:00
self . base_minio_cmd = [ ' docker-compose ' , ' --project-name ' , self . project_name ,
' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_minio.yml ' ) ]
2019-11-20 11:56:38 +00:00
cmds . append ( self . base_minio_cmd )
2019-03-21 18:10:55 +00:00
2020-05-19 02:21:27 +00:00
if with_cassandra and not self . with_cassandra :
self . with_cassandra = True
2020-07-06 13:45:54 +00:00
self . base_cmd . extend ( [ ' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_cassandra.yml ' ) ] )
2021-01-27 11:26:49 +00:00
self . base_cassandra_cmd = [ ' docker-compose ' , ' --project-name ' , self . project_name ,
' --file ' , p . join ( docker_compose_yml_dir , ' docker_compose_cassandra.yml ' ) ]
2020-05-19 02:21:27 +00:00
2021-01-27 16:21:54 +00:00
print ( " Cluster name: {} project_name: {} . Added instance name: {} tag: {} base_cmd: {} docker_compose_yml_dir: {} " . format (
self . name , self . project_name , name , tag , self . base_cmd , docker_compose_yml_dir ) )
2017-05-19 18:54:05 +00:00
return instance
2017-05-30 11:49:17 +00:00
def get_instance_docker_id ( self , instance_name ) :
# According to how docker-compose names containers.
return self . project_name + ' _ ' + instance_name + ' _1 '
2019-06-04 20:59:31 +00:00
def _replace ( self , path , what , to ) :
with open ( path , ' r ' ) as p :
data = p . read ( )
data = data . replace ( what , to )
with open ( path , ' w ' ) as p :
p . write ( data )
def restart_instance_with_ip_change ( self , node , new_ip ) :
if ' :: ' in new_ip :
if node . ipv6_address is None :
2019-07-21 11:45:01 +00:00
raise Exception ( " You should specity ipv6_address in add_node method " )
2019-06-04 20:59:31 +00:00
self . _replace ( node . docker_compose_path , node . ipv6_address , new_ip )
node . ipv6_address = new_ip
else :
if node . ipv4_address is None :
2019-07-21 11:45:01 +00:00
raise Exception ( " You should specity ipv4_address in add_node method " )
2019-06-04 20:59:31 +00:00
self . _replace ( node . docker_compose_path , node . ipv4_address , new_ip )
node . ipv4_address = new_ip
2021-01-22 14:27:23 +00:00
run_and_check ( self . base_cmd + [ " stop " , node . name ] )
run_and_check ( self . base_cmd + [ " rm " , " --force " , " --stop " , node . name ] )
run_and_check ( self . base_cmd + [ " up " , " --force-recreate " , " --no-deps " , " -d " , node . name ] )
2019-06-04 20:59:31 +00:00
node . ip_address = self . get_instance_ip ( node . name )
node . client = Client ( node . ip_address , command = self . client_bin_path )
2019-11-20 11:56:38 +00:00
start_deadline = time . time ( ) + 20.0 # seconds
2019-06-04 20:59:31 +00:00
node . wait_for_start ( start_deadline )
return node
2017-05-30 11:49:17 +00:00
2018-01-25 18:14:37 +00:00
def get_instance_ip ( self , instance_name ) :
2021-01-27 16:21:54 +00:00
print ( " get_instance_ip instance_name= {} " . format ( instance_name ) )
2018-01-25 18:14:37 +00:00
docker_id = self . get_instance_docker_id ( instance_name )
2021-01-27 16:21:54 +00:00
# for cont in self.docker_client.containers.list():
# print("CONTAINERS LIST: ID={} NAME={} STATUS={}".format(cont.id, cont.name, cont.status))
2018-01-25 18:14:37 +00:00
handle = self . docker_client . containers . get ( docker_id )
2020-10-02 16:54:07 +00:00
return list ( handle . attrs [ ' NetworkSettings ' ] [ ' Networks ' ] . values ( ) ) [ 0 ] [ ' IPAddress ' ]
2018-01-25 18:14:37 +00:00
2020-04-29 08:39:00 +00:00
def get_container_id ( self , instance_name ) :
docker_id = self . get_instance_docker_id ( instance_name )
handle = self . docker_client . containers . get ( docker_id )
return handle . attrs [ ' Id ' ]
def get_container_logs ( self , instance_name ) :
container_id = self . get_container_id ( instance_name )
2020-10-02 16:54:07 +00:00
return self . docker_client . api . logs ( container_id ) . decode ( )
2020-04-29 08:39:00 +00:00
2020-08-03 14:40:02 +00:00
def exec_in_container ( self , container_id , cmd , detach = False , nothrow = False , * * kwargs ) :
2020-05-08 10:53:12 +00:00
exec_id = self . docker_client . api . exec_create ( container_id , cmd , * * kwargs )
output = self . docker_client . api . exec_start ( exec_id , detach = detach )
exit_code = self . docker_client . api . exec_inspect ( exec_id ) [ ' ExitCode ' ]
if exit_code :
container_info = self . docker_client . api . inspect_container ( container_id )
image_id = container_info . get ( ' Image ' )
image_info = self . docker_client . api . inspect_image ( image_id )
2020-10-02 16:54:07 +00:00
print ( ( " Command failed in container {} : " . format ( container_id ) ) )
2020-05-08 10:53:12 +00:00
pprint . pprint ( container_info )
print ( " " )
2020-10-02 16:54:07 +00:00
print ( ( " Container {} uses image {} : " . format ( container_id , image_id ) ) )
2020-05-08 10:53:12 +00:00
pprint . pprint ( image_info )
print ( " " )
2020-09-16 04:26:10 +00:00
message = ' Cmd " {} " failed in container {} . Return code {} . Output: {} ' . format ( ' ' . join ( cmd ) , container_id ,
exit_code , output )
2020-08-03 14:40:02 +00:00
if nothrow :
print ( message )
else :
raise Exception ( message )
2020-10-02 16:54:07 +00:00
if not detach :
return output . decode ( )
2020-05-08 10:53:12 +00:00
return output
def copy_file_to_container ( self , container_id , local_path , dest_path ) :
2020-10-02 16:54:07 +00:00
with open ( local_path , " r " ) as fdata :
2020-05-08 10:53:12 +00:00
data = fdata . read ( )
2020-10-02 16:54:07 +00:00
encodedBytes = base64 . b64encode ( data . encode ( " utf-8 " ) )
encodedStr = str ( encodedBytes , " utf-8 " )
2020-09-16 04:26:10 +00:00
self . exec_in_container ( container_id ,
2020-10-02 16:54:07 +00:00
[ " bash " , " -c " , " echo {} | base64 --decode > {} " . format ( encodedStr , dest_path ) ] ,
2020-05-08 10:53:12 +00:00
user = ' root ' )
2018-08-22 15:42:27 +00:00
def wait_mysql_to_start ( self , timeout = 60 ) :
start = time . time ( )
while time . time ( ) - start < timeout :
try :
conn = pymysql . connect ( user = ' root ' , password = ' clickhouse ' , host = ' 127.0.0.1 ' , port = 3308 )
conn . close ( )
2020-10-02 16:54:07 +00:00
print ( " Mysql Started " )
2018-08-22 15:42:27 +00:00
return
2018-08-27 14:45:37 +00:00
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( " Can ' t connect to MySQL " + str ( ex ) )
2018-08-22 15:42:27 +00:00
time . sleep ( 0.5 )
2019-12-12 15:10:09 +00:00
subprocess_call ( [ ' docker-compose ' , ' ps ' , ' --services ' , ' --all ' ] )
2018-08-22 15:42:27 +00:00
raise Exception ( " Cannot wait MySQL container " )
2018-10-15 14:49:23 +00:00
def wait_postgres_to_start ( self , timeout = 60 ) :
start = time . time ( )
while time . time ( ) - start < timeout :
try :
conn_string = " host= ' localhost ' user= ' postgres ' password= ' mysecretpassword ' "
conn = psycopg2 . connect ( conn_string )
conn . close ( )
2020-10-02 16:54:07 +00:00
print ( " Postgres Started " )
2018-10-15 14:49:23 +00:00
return
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( " Can ' t connect to Postgres " + str ( ex ) )
2018-10-15 14:49:23 +00:00
time . sleep ( 0.5 )
raise Exception ( " Cannot wait Postgres container " )
2018-08-27 13:42:39 +00:00
def wait_zookeeper_to_start ( self , timeout = 60 ) :
start = time . time ( )
while time . time ( ) - start < timeout :
try :
for instance in [ ' zoo1 ' , ' zoo2 ' , ' zoo3 ' ] :
conn = self . get_kazoo_client ( instance )
conn . get_children ( ' / ' )
2020-10-02 16:54:07 +00:00
print ( " All instances of ZooKeeper started " )
2018-08-27 13:42:39 +00:00
return
2018-08-27 14:45:37 +00:00
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( " Can ' t connect to ZooKeeper " + str ( ex ) )
2018-08-27 13:42:39 +00:00
time . sleep ( 0.5 )
raise Exception ( " Cannot wait ZooKeeper container " )
2018-01-25 18:14:37 +00:00
2020-10-30 19:40:16 +00:00
def make_hdfs_api ( self , timeout = 60 , kerberized = False ) :
2020-09-10 10:02:46 +00:00
if kerberized :
keytab = p . abspath ( p . join ( self . instances [ ' node1 ' ] . path , " secrets/clickhouse.keytab " ) )
2020-10-30 19:40:16 +00:00
krb_conf = p . abspath ( p . join ( self . instances [ ' node1 ' ] . path , " secrets/krb_long.conf " ) )
2020-09-10 10:02:46 +00:00
hdfs_ip = self . get_instance_ip ( ' kerberizedhdfs1 ' )
2020-10-30 19:40:16 +00:00
# print("kerberizedhdfs1 ip ", hdfs_ip)
2020-09-28 17:20:04 +00:00
kdc_ip = self . get_instance_ip ( ' hdfskerberos ' )
2020-10-30 19:40:16 +00:00
# print("kdc_ip ", kdc_ip)
2020-09-28 17:20:04 +00:00
self . hdfs_api = HDFSApi ( user = " root " ,
2020-09-10 10:02:46 +00:00
timeout = timeout ,
kerberized = True ,
2020-09-28 17:20:04 +00:00
principal = " root@TEST.CLICKHOUSE.TECH " ,
2020-09-10 10:02:46 +00:00
keytab = keytab ,
krb_conf = krb_conf ,
host = " kerberizedhdfs1 " ,
2020-09-28 17:20:04 +00:00
protocol = " http " ,
proxy_port = 50070 ,
data_port = 1006 ,
2020-09-10 10:02:46 +00:00
hdfs_ip = hdfs_ip ,
kdc_ip = kdc_ip )
else :
2020-09-28 17:20:04 +00:00
self . hdfs_api = HDFSApi ( user = " root " , host = " hdfs1 " )
2020-09-10 10:02:46 +00:00
2020-10-30 19:40:16 +00:00
def wait_hdfs_to_start ( self , timeout = 60 ) :
start = time . time ( )
2018-12-05 13:24:45 +00:00
while time . time ( ) - start < timeout :
try :
2020-09-28 17:20:04 +00:00
self . hdfs_api . write_data ( " /somefilewithrandomname222 " , " 1 " )
2020-10-02 16:54:07 +00:00
print ( " Connected to HDFS and SafeMode disabled! " )
2018-12-05 13:24:45 +00:00
return
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( " Can ' t connect to HDFS " + str ( ex ) )
2018-12-05 13:24:45 +00:00
time . sleep ( 1 )
raise Exception ( " Can ' t wait HDFS to start " )
2019-02-25 10:45:22 +00:00
def wait_mongo_to_start ( self , timeout = 30 ) :
connection_str = ' mongodb:// {user} : {password} @ {host} : {port} ' . format (
host = ' localhost ' , port = ' 27018 ' , user = ' root ' , password = ' clickhouse ' )
connection = pymongo . MongoClient ( connection_str )
start = time . time ( )
while time . time ( ) - start < timeout :
try :
2020-09-22 13:52:56 +00:00
connection . list_database_names ( )
2020-10-02 16:54:07 +00:00
print ( " Connected to Mongo dbs: " , connection . database_names ( ) )
2019-02-25 10:45:22 +00:00
return
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( " Can ' t connect to Mongo " + str ( ex ) )
2019-02-25 10:45:22 +00:00
time . sleep ( 1 )
2020-07-10 19:42:18 +00:00
def wait_minio_to_start ( self , timeout = 30 , secure = False ) :
2019-11-20 11:56:38 +00:00
minio_client = Minio ( ' localhost:9001 ' ,
access_key = ' minio ' ,
secret_key = ' minio123 ' ,
2020-07-10 19:42:18 +00:00
secure = secure )
2019-11-20 11:56:38 +00:00
start = time . time ( )
while time . time ( ) - start < timeout :
try :
2020-05-22 11:48:08 +00:00
minio_client . list_buckets ( )
2020-08-12 08:55:04 +00:00
print ( " Connected to Minio. " )
2020-05-22 11:48:08 +00:00
2021-01-12 17:18:40 +00:00
buckets = [ self . minio_bucket , self . minio_bucket_2 ]
2020-05-22 11:48:08 +00:00
2021-01-12 17:18:40 +00:00
for bucket in buckets :
if minio_client . bucket_exists ( bucket ) :
minio_client . remove_bucket ( bucket )
minio_client . make_bucket ( bucket )
print ( " S3 bucket ' %s ' created " , bucket )
2020-05-22 11:48:08 +00:00
2019-11-20 11:56:38 +00:00
self . minio_client = minio_client
return
except Exception as ex :
2021-01-12 17:18:40 +00:00
print ( " Can ' t connect to Minio: %s " , str ( ex ) )
2019-11-20 11:56:38 +00:00
time . sleep ( 1 )
2020-05-22 11:48:08 +00:00
raise Exception ( " Can ' t wait Minio to start " )
2020-02-03 00:02:19 +00:00
def wait_schema_registry_to_start ( self , timeout = 10 ) :
sr_client = CachedSchemaRegistryClient ( ' http://localhost:8081 ' )
start = time . time ( )
while time . time ( ) - start < timeout :
try :
sr_client . _send_request ( sr_client . url )
self . schema_registry_client = sr_client
2020-08-12 08:55:04 +00:00
print ( " Connected to SchemaRegistry " )
2020-02-03 00:02:19 +00:00
return
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( ( " Can ' t connect to SchemaRegistry: %s " , str ( ex ) ) )
2020-02-03 00:02:19 +00:00
time . sleep ( 1 )
2020-05-27 13:54:39 +00:00
def wait_cassandra_to_start ( self , timeout = 30 ) :
2020-05-26 19:21:18 +00:00
cass_client = cassandra . cluster . Cluster ( [ " localhost " ] , port = " 9043 " )
start = time . time ( )
while time . time ( ) - start < timeout :
try :
2020-05-27 20:13:25 +00:00
cass_client . connect ( )
logging . info ( " Connected to Cassandra " )
2020-05-26 19:21:18 +00:00
return
except Exception as ex :
2020-05-27 20:13:25 +00:00
logging . warning ( " Can ' t connect to Cassandra: %s " , str ( ex ) )
2020-05-26 19:21:18 +00:00
time . sleep ( 1 )
2017-05-19 18:54:05 +00:00
def start ( self , destroy_dirs = True ) :
2020-10-02 16:54:07 +00:00
print ( " Cluster start called. is_up= {} , destroy_dirs= {} " . format ( self . is_up , destroy_dirs ) )
2017-05-19 18:54:05 +00:00
if self . is_up :
return
2017-08-14 11:49:30 +00:00
# Just in case kill unstopped containers from previous launch
2017-07-27 22:29:48 +00:00
try :
2020-08-12 08:55:04 +00:00
print ( " Trying to kill unstopped containers... " )
2019-11-20 11:56:38 +00:00
2018-09-28 14:53:20 +00:00
if not subprocess_call ( [ ' docker-compose ' , ' kill ' ] ) :
subprocess_call ( [ ' docker-compose ' , ' down ' , ' --volumes ' ] )
2020-08-12 08:55:04 +00:00
print ( " Unstopped containers killed " )
2017-07-27 22:29:48 +00:00
except :
pass
2020-02-29 11:14:36 +00:00
try :
if destroy_dirs and p . exists ( self . instances_dir ) :
2020-10-02 16:54:07 +00:00
print ( ( " Removing instances dir %s " , self . instances_dir ) )
2020-02-29 11:14:36 +00:00
shutil . rmtree ( self . instances_dir )
2020-10-02 16:54:07 +00:00
for instance in list ( self . instances . values ( ) ) :
print ( ( ' Setup directory for instance: {} destroy_dirs: {} ' . format ( instance . name , destroy_dirs ) ) )
2020-02-29 11:14:36 +00:00
instance . create_dir ( destroy_dir = destroy_dirs )
self . docker_client = docker . from_env ( version = self . docker_api_version )
common_opts = [ ' up ' , ' -d ' , ' --force-recreate ' ]
if self . with_zookeeper and self . base_zookeeper_cmd :
2020-08-12 08:55:04 +00:00
print ( ' Setup ZooKeeper ' )
2020-05-19 15:27:10 +00:00
env = os . environ . copy ( )
if not self . zookeeper_use_tmpfs :
env [ ' ZK_FS ' ] = ' bind '
for i in range ( 1 , 4 ) :
zk_data_path = self . instances_dir + ' /zkdata ' + str ( i )
zk_log_data_path = self . instances_dir + ' /zklog ' + str ( i )
if not os . path . exists ( zk_data_path ) :
os . mkdir ( zk_data_path )
if not os . path . exists ( zk_log_data_path ) :
os . mkdir ( zk_log_data_path )
env [ ' ZK_DATA ' + str ( i ) ] = zk_data_path
env [ ' ZK_DATA_LOG ' + str ( i ) ] = zk_log_data_path
2021-01-22 14:27:23 +00:00
run_and_check ( self . base_zookeeper_cmd + common_opts , env = env )
2020-02-29 11:14:36 +00:00
for command in self . pre_zookeeper_commands :
self . run_kazoo_commands_with_retries ( command , repeats = 5 )
self . wait_zookeeper_to_start ( 120 )
if self . with_mysql and self . base_mysql_cmd :
2020-08-12 08:55:04 +00:00
print ( ' Setup MySQL ' )
2020-02-29 11:14:36 +00:00
subprocess_check_call ( self . base_mysql_cmd + common_opts )
self . wait_mysql_to_start ( 120 )
if self . with_postgres and self . base_postgres_cmd :
2020-08-12 08:55:04 +00:00
print ( ' Setup Postgres ' )
2020-02-29 11:14:36 +00:00
subprocess_check_call ( self . base_postgres_cmd + common_opts )
self . wait_postgres_to_start ( 120 )
if self . with_kafka and self . base_kafka_cmd :
2020-08-12 08:55:04 +00:00
print ( ' Setup Kafka ' )
2020-02-29 11:14:36 +00:00
subprocess_check_call ( self . base_kafka_cmd + common_opts + [ ' --renew-anon-volumes ' ] )
self . kafka_docker_id = self . get_instance_docker_id ( ' kafka1 ' )
self . wait_schema_registry_to_start ( 120 )
2020-09-29 08:56:37 +00:00
if self . with_kerberized_kafka and self . base_kerberized_kafka_cmd :
2020-10-30 19:40:16 +00:00
print ( ' Setup kerberized kafka ' )
2020-09-29 08:56:37 +00:00
env = os . environ . copy ( )
env [ ' KERBERIZED_KAFKA_DIR ' ] = instance . path + ' / '
2021-01-22 14:27:23 +00:00
run_and_check ( self . base_kerberized_kafka_cmd + common_opts + [ ' --renew-anon-volumes ' ] , env = env )
2020-09-29 08:56:37 +00:00
self . kerberized_kafka_docker_id = self . get_instance_docker_id ( ' kerberized_kafka1 ' )
2020-05-20 06:22:12 +00:00
if self . with_rabbitmq and self . base_rabbitmq_cmd :
subprocess_check_call ( self . base_rabbitmq_cmd + common_opts + [ ' --renew-anon-volumes ' ] )
self . rabbitmq_docker_id = self . get_instance_docker_id ( ' rabbitmq1 ' )
2020-02-29 11:14:36 +00:00
if self . with_hdfs and self . base_hdfs_cmd :
2020-08-12 08:55:04 +00:00
print ( ' Setup HDFS ' )
2020-02-29 11:14:36 +00:00
subprocess_check_call ( self . base_hdfs_cmd + common_opts )
2020-10-30 19:40:16 +00:00
self . make_hdfs_api ( )
2020-02-29 11:14:36 +00:00
self . wait_hdfs_to_start ( 120 )
2020-09-10 10:02:46 +00:00
if self . with_kerberized_hdfs and self . base_kerberized_hdfs_cmd :
print ( ' Setup kerberized HDFS ' )
2020-10-30 19:40:16 +00:00
env = os . environ . copy ( )
env [ ' KERBERIZED_HDFS_DIR ' ] = instance . path + ' / '
2021-01-22 14:27:23 +00:00
run_and_check ( self . base_kerberized_hdfs_cmd + common_opts , env = env )
2020-10-30 19:40:16 +00:00
self . make_hdfs_api ( kerberized = True )
self . wait_hdfs_to_start ( timeout = 300 )
2020-09-10 10:02:46 +00:00
2020-02-29 11:14:36 +00:00
if self . with_mongo and self . base_mongo_cmd :
2020-08-12 08:55:04 +00:00
print ( ' Setup Mongo ' )
2021-01-22 14:27:23 +00:00
run_and_check ( self . base_mongo_cmd + common_opts )
2020-02-29 11:14:36 +00:00
self . wait_mongo_to_start ( 30 )
if self . with_redis and self . base_redis_cmd :
2020-08-12 08:55:04 +00:00
print ( ' Setup Redis ' )
2020-02-29 11:14:36 +00:00
subprocess_check_call ( self . base_redis_cmd + [ ' up ' , ' -d ' , ' --force-recreate ' ] )
time . sleep ( 10 )
if self . with_minio and self . base_minio_cmd :
2020-07-10 19:42:18 +00:00
env = os . environ . copy ( )
prev_ca_certs = os . environ . get ( ' SSL_CERT_FILE ' )
if self . minio_certs_dir :
minio_certs_dir = p . join ( self . base_dir , self . minio_certs_dir )
env [ ' MINIO_CERTS_DIR ' ] = minio_certs_dir
# Minio client (urllib3) uses SSL_CERT_FILE for certificate validation.
os . environ [ ' SSL_CERT_FILE ' ] = p . join ( minio_certs_dir , ' public.crt ' )
else :
# Attach empty certificates directory to ensure non-secure mode.
minio_certs_dir = p . join ( self . instances_dir , ' empty_minio_certs_dir ' )
os . mkdir ( minio_certs_dir )
env [ ' MINIO_CERTS_DIR ' ] = minio_certs_dir
2020-02-29 11:14:36 +00:00
minio_start_cmd = self . base_minio_cmd + common_opts
2020-07-10 19:42:18 +00:00
2020-02-29 11:14:36 +00:00
logging . info ( " Trying to create Minio instance by command %s " , ' ' . join ( map ( str , minio_start_cmd ) ) )
2021-01-22 14:27:23 +00:00
run_and_check ( minio_start_cmd , env = env )
2020-07-10 19:42:18 +00:00
try :
logging . info ( " Trying to connect to Minio... " )
self . wait_minio_to_start ( secure = self . minio_certs_dir is not None )
finally :
# Safely return previous value of SSL_CERT_FILE environment variable.
if self . minio_certs_dir :
if prev_ca_certs :
os . environ [ ' SSL_CERT_FILE ' ] = prev_ca_certs
else :
os . environ . pop ( ' SSL_CERT_FILE ' )
2020-02-29 11:14:36 +00:00
2020-05-19 02:21:27 +00:00
if self . with_cassandra and self . base_cassandra_cmd :
subprocess_check_call ( self . base_cassandra_cmd + [ ' up ' , ' -d ' , ' --force-recreate ' ] )
2020-05-26 19:21:18 +00:00
self . wait_cassandra_to_start ( )
2020-05-19 02:21:27 +00:00
2020-02-29 11:14:36 +00:00
clickhouse_start_cmd = self . base_cmd + [ ' up ' , ' -d ' , ' --no-recreate ' ]
2020-10-02 16:54:07 +00:00
print ( ( " Trying to create ClickHouse instance by command %s " , ' ' . join ( map ( str , clickhouse_start_cmd ) ) ) )
2020-12-01 20:23:32 +00:00
subprocess . check_output ( clickhouse_start_cmd )
2020-08-12 08:55:04 +00:00
print ( " ClickHouse instance created " )
2020-02-29 11:14:36 +00:00
start_deadline = time . time ( ) + 20.0 # seconds
2020-10-02 16:54:07 +00:00
for instance in self . instances . values ( ) :
2020-02-29 11:14:36 +00:00
instance . docker_client = self . docker_client
instance . ip_address = self . get_instance_ip ( instance . name )
2020-08-12 08:55:04 +00:00
print ( " Waiting for ClickHouse start... " )
2020-02-29 11:14:36 +00:00
instance . wait_for_start ( start_deadline )
2020-08-12 08:55:04 +00:00
print ( " ClickHouse started " )
2020-02-29 11:14:36 +00:00
instance . client = Client ( instance . ip_address , command = self . client_bin_path )
self . is_up = True
2020-03-26 14:43:22 +00:00
2020-10-02 16:54:07 +00:00
except BaseException as e :
print ( " Failed to start cluster: " )
print ( str ( e ) )
print ( traceback . print_exc ( ) )
2020-02-29 11:14:36 +00:00
raise
2017-05-19 18:54:05 +00:00
def shutdown ( self , kill = True ) :
2019-11-14 16:00:02 +00:00
sanitizer_assert_instance = None
with open ( self . docker_logs_path , " w+ " ) as f :
2020-08-12 08:55:04 +00:00
try :
2021-01-22 14:27:23 +00:00
subprocess . check_call ( self . base_cmd + [ ' logs ' ] , stdout = f ) # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL
2020-08-12 08:55:04 +00:00
except Exception as e :
2020-10-02 16:54:07 +00:00
print ( " Unable to get logs from docker. " )
2019-11-14 16:00:02 +00:00
f . seek ( 0 )
for line in f :
if SANITIZER_SIGN in line :
sanitizer_assert_instance = line . split ( ' | ' ) [ 0 ] . strip ( )
break
2017-05-19 18:54:05 +00:00
if kill :
2020-08-12 08:55:04 +00:00
try :
2020-12-24 15:51:12 +00:00
subprocess_check_call ( self . base_cmd + [ ' stop ' , ' --timeout ' , ' 20 ' ] )
2020-08-12 08:55:04 +00:00
except Exception as e :
2020-12-24 15:51:12 +00:00
print ( " Kill command failed during shutdown. {} " . format ( repr ( e ) ) )
print ( " Trying to kill forcefully " )
subprocess_check_call ( self . base_cmd + [ ' kill ' ] )
2020-08-12 08:55:04 +00:00
try :
subprocess_check_call ( self . base_cmd + [ ' down ' , ' --volumes ' , ' --remove-orphans ' ] )
except Exception as e :
2020-10-02 16:54:07 +00:00
print ( " Down + remove orphans failed durung shutdown. {} " . format ( repr ( e ) ) )
2019-11-20 11:56:38 +00:00
2017-05-19 18:54:05 +00:00
self . is_up = False
2017-05-23 17:13:36 +00:00
self . docker_client = None
2020-10-02 16:54:07 +00:00
for instance in list ( self . instances . values ( ) ) :
2017-05-23 17:13:36 +00:00
instance . docker_client = None
2017-05-19 18:54:05 +00:00
instance . ip_address = None
instance . client = None
2020-05-19 15:27:10 +00:00
if not self . zookeeper_use_tmpfs :
2020-09-16 04:26:10 +00:00
for i in range ( 1 , 4 ) :
zk_data_path = self . instances_dir + ' /zkdata ' + str ( i )
zk_log_data_path = self . instances_dir + ' /zklog ' + str ( i )
if os . path . exists ( zk_data_path ) :
shutil . rmtree ( zk_data_path )
if os . path . exists ( zk_log_data_path ) :
shutil . rmtree ( zk_log_data_path )
2020-05-19 15:27:10 +00:00
2019-11-14 16:00:02 +00:00
if sanitizer_assert_instance is not None :
2020-09-16 04:26:10 +00:00
raise Exception (
" Sanitizer assert found in {} for instance {} " . format ( self . docker_logs_path , sanitizer_assert_instance ) )
2019-11-14 16:00:02 +00:00
2020-03-26 14:43:22 +00:00
def pause_container ( self , instance_name ) :
subprocess_check_call ( self . base_cmd + [ ' pause ' , instance_name ] )
2020-09-16 04:26:10 +00:00
2020-03-26 14:43:22 +00:00
# subprocess_check_call(self.base_cmd + ['kill', '-s SIGSTOP', instance_name])
def unpause_container ( self , instance_name ) :
subprocess_check_call ( self . base_cmd + [ ' unpause ' , instance_name ] )
2020-09-16 04:26:10 +00:00
2020-03-26 14:43:22 +00:00
# subprocess_check_call(self.base_cmd + ['kill', '-s SIGCONT', instance_name])
2017-05-19 18:54:05 +00:00
2019-06-18 07:20:14 +00:00
def open_bash_shell ( self , instance_name ) :
os . system ( ' ' . join ( self . base_cmd + [ ' exec ' , instance_name , ' /bin/bash ' ] ) )
2017-05-19 18:54:05 +00:00
2018-01-25 18:14:37 +00:00
def get_kazoo_client ( self , zoo_instance_name ) :
zk = KazooClient ( hosts = self . get_instance_ip ( zoo_instance_name ) )
zk . start ( )
return zk
2019-11-20 11:56:38 +00:00
def run_kazoo_commands_with_retries ( self , kazoo_callback , zoo_instance_name = ' zoo1 ' , repeats = 1 , sleep_for = 1 ) :
2018-01-25 18:14:37 +00:00
for i in range ( repeats - 1 ) :
2017-08-30 16:25:34 +00:00
try :
2018-01-25 18:14:37 +00:00
kazoo_callback ( self . get_kazoo_client ( zoo_instance_name ) )
return
except KazooException as e :
2020-10-02 16:54:07 +00:00
print ( repr ( e ) )
2017-08-30 16:25:34 +00:00
time . sleep ( sleep_for )
2018-01-25 18:14:37 +00:00
kazoo_callback ( self . get_kazoo_client ( zoo_instance_name ) )
2017-08-30 16:25:34 +00:00
def add_zookeeper_startup_command ( self , command ) :
2018-01-09 19:12:43 +00:00
self . pre_zookeeper_commands . append ( command )
2017-08-30 16:25:34 +00:00
2020-09-10 04:00:33 +00:00
def stop_zookeeper_nodes ( self , zk_nodes ) :
for n in zk_nodes :
logging . info ( " Stopping zookeeper node: %s " , n )
subprocess_check_call ( self . base_zookeeper_cmd + [ " stop " , n ] )
def start_zookeeper_nodes ( self , zk_nodes ) :
for n in zk_nodes :
logging . info ( " Starting zookeeper node: %s " , n )
subprocess_check_call ( self . base_zookeeper_cmd + [ " start " , n ] )
2017-08-30 16:25:34 +00:00
2018-11-22 15:59:00 +00:00
CLICKHOUSE_START_COMMAND = " clickhouse server --config-file=/etc/clickhouse-server/config.xml --log-file=/var/log/clickhouse-server/clickhouse-server.log --errorlog-file=/var/log/clickhouse-server/clickhouse-server.err.log "
CLICKHOUSE_STAY_ALIVE_COMMAND = ' bash -c " {} --daemon; tail -f /dev/null " ' . format ( CLICKHOUSE_START_COMMAND )
2017-05-19 18:54:05 +00:00
DOCKER_COMPOSE_TEMPLATE = '''
2020-05-19 15:27:10 +00:00
version : ' 2.3 '
2017-05-19 18:54:05 +00:00
services :
{ name } :
2020-09-01 06:38:23 +00:00
image : { image } : { tag }
2017-08-02 14:42:35 +00:00
hostname : { hostname }
2017-05-19 18:54:05 +00:00
volumes :
2020-08-12 08:55:04 +00:00
- { instance_config_dir } : / etc / clickhouse - server /
2017-05-19 18:54:05 +00:00
- { db_dir } : / var / lib / clickhouse /
- { logs_dir } : / var / log / clickhouse - server /
2020-09-29 08:56:37 +00:00
- / etc / passwd : / etc / passwd : ro
2019-06-20 16:25:32 +00:00
{ binary_volume }
{ odbc_bridge_volume }
2018-08-22 15:42:27 +00:00
{ odbc_ini_path }
2020-09-29 08:56:37 +00:00
{ keytab_path }
{ krb5_conf }
2018-11-22 15:59:00 +00:00
entrypoint : { entrypoint_cmd }
2019-05-22 16:40:30 +00:00
tmpfs : { tmpfs }
2018-12-27 19:42:25 +00:00
cap_add :
- SYS_PTRACE
2020-10-27 12:24:10 +00:00
- NET_ADMIN
2017-05-19 18:54:05 +00:00
depends_on : { depends_on }
2019-02-21 17:34:19 +00:00
user : ' {user} '
2018-07-28 14:38:08 +00:00
env_file :
- { env_file }
2019-02-21 17:34:19 +00:00
security_opt :
- label : disable
2020-08-12 08:55:04 +00:00
dns_opt :
2020-09-02 08:07:46 +00:00
- attempts : 2
2020-08-12 08:55:04 +00:00
- timeout : 1
2020-09-02 08:07:46 +00:00
- inet6
- rotate
2018-12-27 15:55:51 +00:00
{ networks }
{ app_net }
{ ipv4_address }
{ ipv6_address }
2019-12-18 13:42:39 +00:00
{ net_aliases }
{ net_alias1 }
2017-05-19 18:54:05 +00:00
'''
2019-06-04 20:59:31 +00:00
2017-05-19 18:54:05 +00:00
class ClickHouseInstance :
2018-08-22 15:42:27 +00:00
2017-05-19 18:54:05 +00:00
def __init__ (
2020-09-16 04:26:10 +00:00
self , cluster , base_path , name , base_config_dir , custom_main_configs , custom_user_configs ,
custom_dictionaries ,
2020-09-10 10:02:46 +00:00
macros , with_zookeeper , zookeeper_config_path , with_mysql , with_kafka , with_kerberized_kafka , with_rabbitmq , with_kerberized_hdfs ,
with_mongo , with_redis , with_minio ,
2020-09-16 04:26:10 +00:00
with_cassandra , server_bin_path , odbc_bridge_bin_path , clickhouse_path_dir , with_odbc_drivers ,
hostname = None , env_variables = None ,
2020-09-01 06:38:23 +00:00
image = " yandex/clickhouse-integration-test " , tag = " latest " ,
2020-04-11 10:57:13 +00:00
stay_alive = False , ipv4_address = None , ipv6_address = None , with_installed_binary = False , tmpfs = None ) :
2017-05-19 18:54:05 +00:00
self . name = name
2020-04-11 10:57:13 +00:00
self . base_cmd = cluster . base_cmd
2017-05-30 11:49:17 +00:00
self . docker_id = cluster . get_instance_docker_id ( self . name )
self . cluster = cluster
2017-08-02 14:42:35 +00:00
self . hostname = hostname if hostname is not None else self . name
2017-05-30 11:49:17 +00:00
2020-04-11 10:57:13 +00:00
self . tmpfs = tmpfs or [ ]
2020-08-12 08:55:04 +00:00
self . base_config_dir = p . abspath ( p . join ( base_path , base_config_dir ) ) if base_config_dir else None
2017-05-30 11:49:17 +00:00
self . custom_main_config_paths = [ p . abspath ( p . join ( base_path , c ) ) for c in custom_main_configs ]
self . custom_user_config_paths = [ p . abspath ( p . join ( base_path , c ) ) for c in custom_user_configs ]
2020-08-12 08:55:04 +00:00
self . custom_dictionaries_paths = [ p . abspath ( p . join ( base_path , c ) ) for c in custom_dictionaries ]
2017-06-15 20:08:26 +00:00
self . clickhouse_path_dir = p . abspath ( p . join ( base_path , clickhouse_path_dir ) ) if clickhouse_path_dir else None
2020-09-29 08:56:37 +00:00
self . kerberos_secrets_dir = p . abspath ( p . join ( base_path , ' secrets ' ) )
2018-07-25 16:00:51 +00:00
self . macros = macros if macros is not None else { }
2017-05-19 18:54:05 +00:00
self . with_zookeeper = with_zookeeper
2017-08-30 16:25:34 +00:00
self . zookeeper_config_path = zookeeper_config_path
2017-05-19 18:54:05 +00:00
self . server_bin_path = server_bin_path
2019-01-29 17:17:31 +00:00
self . odbc_bridge_bin_path = odbc_bridge_bin_path
2017-05-19 18:54:05 +00:00
2018-05-14 11:10:07 +00:00
self . with_mysql = with_mysql
2018-07-18 05:22:01 +00:00
self . with_kafka = with_kafka
2020-09-29 08:56:37 +00:00
self . with_kerberized_kafka = with_kerberized_kafka
2020-05-20 06:22:12 +00:00
self . with_rabbitmq = with_rabbitmq
2020-09-10 10:02:46 +00:00
self . with_kerberized_hdfs = with_kerberized_hdfs
2019-02-25 10:45:22 +00:00
self . with_mongo = with_mongo
2019-03-21 18:10:55 +00:00
self . with_redis = with_redis
2019-11-20 11:56:38 +00:00
self . with_minio = with_minio
2020-05-19 02:21:27 +00:00
self . with_cassandra = with_cassandra
2018-05-14 11:10:07 +00:00
2017-07-26 12:31:55 +00:00
self . path = p . join ( self . cluster . instances_dir , name )
2021-01-27 11:26:49 +00:00
self . docker_compose_path = p . join ( self . path , ' docker-compose.yml ' )
2020-04-11 10:57:13 +00:00
self . env_variables = env_variables or { }
2018-08-22 15:42:27 +00:00
if with_odbc_drivers :
2020-08-12 08:55:04 +00:00
self . odbc_ini_path = self . path + " /odbc.ini:/etc/odbc.ini "
2018-08-22 15:42:27 +00:00
self . with_mysql = True
else :
self . odbc_ini_path = " "
2017-05-19 18:54:05 +00:00
2020-09-10 10:02:46 +00:00
if with_kerberized_kafka or with_kerberized_hdfs :
2020-09-29 08:56:37 +00:00
self . keytab_path = ' - ' + os . path . dirname ( self . docker_compose_path ) + " /secrets:/tmp/keytab "
2020-10-30 19:40:16 +00:00
self . krb5_conf = ' - ' + os . path . dirname ( self . docker_compose_path ) + " /secrets/krb.conf:/etc/krb5.conf:ro "
2020-09-29 08:56:37 +00:00
else :
self . keytab_path = " "
self . krb5_conf = " "
2017-05-23 17:13:36 +00:00
self . docker_client = None
2017-05-19 18:54:05 +00:00
self . ip_address = None
self . client = None
2019-11-20 11:56:38 +00:00
self . default_timeout = 20.0 # 20 sec
2018-08-22 15:42:27 +00:00
self . image = image
2020-09-01 06:38:23 +00:00
self . tag = tag
2018-11-22 15:59:00 +00:00
self . stay_alive = stay_alive
2018-12-27 15:55:51 +00:00
self . ipv4_address = ipv4_address
self . ipv6_address = ipv6_address
2019-06-20 16:25:32 +00:00
self . with_installed_binary = with_installed_binary
2017-05-30 11:49:17 +00:00
2020-06-22 13:10:25 +00:00
def is_built_with_thread_sanitizer ( self ) :
build_opts = self . query ( " SELECT value FROM system.build_options WHERE name = ' CXX_FLAGS ' " )
return " -fsanitize=thread " in build_opts
2020-11-23 15:18:09 +00:00
def is_built_with_address_sanitizer ( self ) :
build_opts = self . query ( " SELECT value FROM system.build_options WHERE name = ' CXX_FLAGS ' " )
return " -fsanitize=address " in build_opts
2017-08-14 01:29:19 +00:00
# Connects to the instance via clickhouse-client, sends a query (1st argument) and returns the answer
2020-09-16 04:26:10 +00:00
def query ( self , sql , stdin = None , timeout = None , settings = None , user = None , password = None , database = None ,
ignore_error = False ) :
return self . client . query ( sql , stdin = stdin , timeout = timeout , settings = settings , user = user , password = password ,
database = database , ignore_error = ignore_error )
2018-09-03 14:06:00 +00:00
2020-09-16 04:26:10 +00:00
def query_with_retry ( self , sql , stdin = None , timeout = None , settings = None , user = None , password = None , database = None ,
ignore_error = False ,
2019-11-20 11:56:38 +00:00
retry_count = 20 , sleep_time = 0.5 , check_callback = lambda x : True ) :
2018-09-03 14:06:00 +00:00
result = None
for i in range ( retry_count ) :
try :
2020-09-16 04:26:10 +00:00
result = self . query ( sql , stdin = stdin , timeout = timeout , settings = settings , user = user , password = password ,
database = database , ignore_error = ignore_error )
2018-09-03 14:06:00 +00:00
if check_callback ( result ) :
return result
time . sleep ( sleep_time )
except Exception as ex :
2020-10-02 16:54:07 +00:00
print ( " Retry {} got exception {} " . format ( i + 1 , ex ) )
2018-09-03 14:06:00 +00:00
time . sleep ( sleep_time )
if result is not None :
return result
raise Exception ( " Can ' t execute query {} " . format ( sql ) )
2017-05-30 11:49:17 +00:00
2017-07-26 12:31:55 +00:00
# As query() but doesn't wait response and returns response handler
2017-05-30 11:49:17 +00:00
def get_query_request ( self , * args , * * kwargs ) :
return self . client . get_query_request ( * args , * * kwargs )
2019-04-07 00:31:20 +00:00
# Connects to the instance via clickhouse-client, sends a query (1st argument), expects an error and return its code
2020-09-16 04:26:10 +00:00
def query_and_get_error ( self , sql , stdin = None , timeout = None , settings = None , user = None , password = None ,
database = None ) :
return self . client . query_and_get_error ( sql , stdin = stdin , timeout = timeout , settings = settings , user = user ,
password = password , database = database )
2019-04-07 00:31:20 +00:00
2019-07-17 11:55:18 +00:00
# The same as query_and_get_error but ignores successful query.
2020-09-16 04:26:10 +00:00
def query_and_get_answer_with_error ( self , sql , stdin = None , timeout = None , settings = None , user = None , password = None ,
database = None ) :
return self . client . query_and_get_answer_with_error ( sql , stdin = stdin , timeout = timeout , settings = settings ,
user = user , password = password , database = database )
2019-07-17 11:55:18 +00:00
2019-03-30 18:40:52 +00:00
# Connects to the instance via HTTP interface, sends a query and returns the answer
2020-03-29 13:21:26 +00:00
def http_query ( self , sql , data = None , params = None , user = None , password = None , expect_fail_and_get_error = False ) :
2019-12-17 17:20:15 +00:00
if params is None :
params = { }
else :
params = params . copy ( )
params [ " query " ] = sql
2020-10-02 16:54:07 +00:00
auth = None
2020-02-29 12:57:52 +00:00
if user and password :
2020-10-02 16:54:07 +00:00
auth = requests . auth . HTTPBasicAuth ( user , password )
2020-02-29 12:57:52 +00:00
elif user :
2020-10-02 16:54:07 +00:00
auth = requests . auth . HTTPBasicAuth ( user , ' ' )
url = " http:// " + self . ip_address + " :8123/? " + urllib . parse . urlencode ( params )
2019-12-17 17:20:15 +00:00
2020-10-02 16:54:07 +00:00
if data :
r = requests . post ( url , data , auth = auth )
else :
r = requests . get ( url , auth = auth )
2020-03-29 13:21:26 +00:00
def http_code_and_message ( ) :
2020-10-02 16:54:07 +00:00
code = r . status_code
return str ( code ) + " " + http . client . responses [ code ] + " : " + r . text
2019-11-08 12:37:31 +00:00
2020-03-29 13:21:26 +00:00
if expect_fail_and_get_error :
2020-10-02 16:54:07 +00:00
if r . ok :
raise Exception ( " ClickHouse HTTP server is expected to fail, but succeeded: " + r . text )
2020-03-29 13:21:26 +00:00
return http_code_and_message ( )
else :
2020-10-02 16:54:07 +00:00
if not r . ok :
2020-03-29 13:21:26 +00:00
raise Exception ( " ClickHouse HTTP server returned " + http_code_and_message ( ) )
2020-10-02 16:54:07 +00:00
return r . text
2020-03-29 13:21:26 +00:00
2019-11-14 02:20:06 +00:00
# Connects to the instance via HTTP interface, sends a query and returns the answer
def http_request ( self , url , method = ' GET ' , params = None , data = None , headers = None ) :
2020-09-16 04:26:10 +00:00
url = " http:// " + self . ip_address + " :8123/ " + url
2020-04-21 11:30:45 +00:00
return requests . request ( method = method , url = url , params = params , data = data , headers = headers )
2019-11-08 12:37:31 +00:00
2020-03-29 13:21:26 +00:00
# Connects to the instance via HTTP interface, sends a query, expects an error and return the error message
def http_query_and_get_error ( self , sql , data = None , params = None , user = None , password = None ) :
2020-09-16 04:26:10 +00:00
return self . http_query ( sql = sql , data = data , params = params , user = user , password = password ,
expect_fail_and_get_error = True )
2019-03-30 18:40:52 +00:00
2021-01-11 17:37:08 +00:00
def stop_clickhouse ( self , start_wait_sec = 5 , kill = False ) :
2019-03-14 13:39:47 +00:00
if not self . stay_alive :
2021-01-11 17:37:08 +00:00
raise Exception ( " clickhouse can be stopped only with stay_alive=True instance " )
2019-03-14 13:39:47 +00:00
2019-12-09 16:20:56 +00:00
self . exec_in_container ( [ " bash " , " -c " , " pkill {} clickhouse " . format ( " -9 " if kill else " " ) ] , user = ' root ' )
2021-01-11 17:37:08 +00:00
time . sleep ( start_wait_sec )
def start_clickhouse ( self , stop_wait_sec = 5 ) :
if not self . stay_alive :
raise Exception ( " clickhouse can be started again only with stay_alive=True instance " )
2019-06-20 16:25:32 +00:00
self . exec_in_container ( [ " bash " , " -c " , " {} --daemon " . format ( CLICKHOUSE_START_COMMAND ) ] , user = str ( os . getuid ( ) ) )
2020-02-27 21:02:06 +00:00
# wait start
from helpers . test_tools import assert_eq_with_retry
2021-01-11 17:37:08 +00:00
assert_eq_with_retry ( self , " select 1 " , " 1 " , retry_count = int ( stop_wait_sec / 0.5 ) , sleep_time = 0.5 )
def restart_clickhouse ( self , stop_start_wait_sec = 5 , kill = False ) :
self . stop_clickhouse ( stop_start_wait_sec , kill )
self . start_clickhouse ( stop_start_wait_sec )
2017-05-19 18:54:05 +00:00
2020-08-03 14:40:02 +00:00
def exec_in_container ( self , cmd , detach = False , nothrow = False , * * kwargs ) :
2020-05-08 10:53:12 +00:00
container_id = self . get_docker_handle ( ) . id
2020-08-03 14:40:02 +00:00
return self . cluster . exec_in_container ( container_id , cmd , detach , nothrow , * * kwargs )
2017-08-02 14:42:35 +00:00
2019-03-29 18:10:03 +00:00
def contains_in_log ( self , substring ) :
2019-11-20 11:56:38 +00:00
result = self . exec_in_container (
2020-02-14 19:11:51 +00:00
[ " bash " , " -c " , ' grep " {} " /var/log/clickhouse-server/clickhouse-server.log || true ' . format ( substring ) ] )
2019-03-29 18:10:03 +00:00
return len ( result ) > 0
2020-11-17 14:36:04 +00:00
def file_exists ( self , path ) :
return self . exec_in_container (
[ " bash " , " -c " , " echo $(if [ -e ' {} ' ]; then echo ' yes ' ; else echo ' no ' ; fi) " . format ( path ) ] ) == ' yes \n '
2019-02-21 17:34:19 +00:00
def copy_file_to_container ( self , local_path , dest_path ) :
2020-05-08 10:53:12 +00:00
container_id = self . get_docker_handle ( ) . id
return self . cluster . copy_file_to_container ( container_id , local_path , dest_path )
2019-02-21 17:34:19 +00:00
2019-06-21 08:03:13 +00:00
def get_process_pid ( self , process_name ) :
2021-01-12 17:18:40 +00:00
output = self . exec_in_container ( [ " bash " , " -c " ,
" ps ax | grep ' {} ' | grep -v ' grep ' | grep -v ' bash -c ' | awk ' {{ print $1}} ' " . format (
process_name ) ] )
2019-06-21 08:03:13 +00:00
if output :
try :
pid = int ( output . split ( ' \n ' ) [ 0 ] . strip ( ) )
return pid
except :
return None
return None
def restart_with_latest_version ( self , stop_start_wait_sec = 10 , callback_onstop = None , signal = 15 ) :
2019-06-20 16:25:32 +00:00
if not self . stay_alive :
raise Exception ( " Cannot restart not stay alive container " )
self . exec_in_container ( [ " bash " , " -c " , " pkill - {} clickhouse " . format ( signal ) ] , user = ' root ' )
2019-06-21 08:03:13 +00:00
retries = int ( stop_start_wait_sec / 0.5 )
local_counter = 0
# wait stop
while local_counter < retries :
if not self . get_process_pid ( " clickhouse server " ) :
break
time . sleep ( 0.5 )
local_counter + = 1
2019-12-17 18:07:13 +00:00
# force kill if server hangs
if self . get_process_pid ( " clickhouse server " ) :
2020-08-03 14:40:02 +00:00
# server can die before kill, so don't throw exception, it's expected
self . exec_in_container ( [ " bash " , " -c " , " pkill - {} clickhouse " . format ( 9 ) ] , nothrow = True , user = ' root ' )
2019-12-17 18:07:13 +00:00
2019-06-20 16:25:32 +00:00
if callback_onstop :
callback_onstop ( self )
2019-11-20 11:56:38 +00:00
self . exec_in_container (
[ " bash " , " -c " , " cp /usr/share/clickhouse_fresh /usr/bin/clickhouse && chmod 777 /usr/bin/clickhouse " ] ,
user = ' root ' )
self . exec_in_container ( [ " bash " , " -c " ,
" cp /usr/share/clickhouse-odbc-bridge_fresh /usr/bin/clickhouse-odbc-bridge && chmod 777 /usr/bin/clickhouse " ] ,
user = ' root ' )
2019-06-20 16:25:32 +00:00
self . exec_in_container ( [ " bash " , " -c " , " {} --daemon " . format ( CLICKHOUSE_START_COMMAND ) ] , user = str ( os . getuid ( ) ) )
2019-06-21 08:03:13 +00:00
from helpers . test_tools import assert_eq_with_retry
# wait start
assert_eq_with_retry ( self , " select 1 " , " 1 " , retry_count = retries )
2017-08-02 14:42:35 +00:00
2017-05-30 11:49:17 +00:00
def get_docker_handle ( self ) :
return self . docker_client . containers . get ( self . docker_id )
2017-05-19 18:54:05 +00:00
2017-05-30 11:49:17 +00:00
def stop ( self ) :
2019-06-04 20:59:31 +00:00
self . get_docker_handle ( ) . stop ( )
2017-05-30 11:49:17 +00:00
def start ( self ) :
self . get_docker_handle ( ) . start ( )
def wait_for_start ( self , deadline = None , timeout = None ) :
start_time = time . time ( )
if timeout is not None :
deadline = start_time + timeout
2017-05-19 18:54:05 +00:00
while True :
2018-07-18 05:22:01 +00:00
handle = self . get_docker_handle ( )
2019-11-20 11:56:38 +00:00
status = handle . status
2017-05-30 11:49:17 +00:00
if status == ' exited ' :
2019-11-20 11:56:38 +00:00
raise Exception (
" Instance ` {} ' failed to start. Container status: {} , logs: {} " . format ( self . name , status ,
2021-01-27 11:26:49 +00:00
handle . logs ( ) . decode ( ' utf-8 ' ) ) )
2017-05-30 11:49:17 +00:00
current_time = time . time ( )
time_left = deadline - current_time
if deadline is not None and current_time > = deadline :
raise Exception ( " Timed out while waiting for instance ` {} ' with ip address {} to start. "
2020-09-16 04:26:10 +00:00
" Container status: {} , logs: {} " . format ( self . name , self . ip_address , status ,
2021-01-27 11:26:49 +00:00
handle . logs ( ) . decode ( ' utf-8 ' ) ) )
2017-05-19 18:54:05 +00:00
# Repeatedly poll the instance address until there is something that listens there.
# Usually it means that ClickHouse is ready to accept queries.
try :
sock = socket . socket ( socket . AF_INET , socket . SOCK_STREAM )
2017-05-23 17:13:36 +00:00
sock . settimeout ( time_left )
2017-05-19 18:54:05 +00:00
sock . connect ( ( self . ip_address , 9000 ) )
return
2017-05-23 17:13:36 +00:00
except socket . timeout :
continue
2017-05-19 18:54:05 +00:00
except socket . error as e :
2020-09-07 16:43:35 +00:00
if e . errno == errno . ECONNREFUSED or e . errno == errno . EHOSTUNREACH or e . errno == errno . ENETUNREACH :
2017-05-19 18:54:05 +00:00
time . sleep ( 0.1 )
else :
raise
finally :
sock . close ( )
2017-05-30 11:49:17 +00:00
@staticmethod
def dict_to_xml ( dictionary ) :
2021-01-27 09:50:11 +00:00
xml_str = dict2xml ( dictionary , wrap = " yandex " , indent = " " , newlines = True )
return xml_str
2017-05-30 11:49:17 +00:00
2018-08-22 15:42:27 +00:00
@property
def odbc_drivers ( self ) :
if self . odbc_ini_path :
return {
" SQLite3 " : {
" DSN " : " sqlite3_odbc " ,
2019-11-20 11:56:38 +00:00
" Database " : " /tmp/sqliteodbc " ,
2018-08-22 15:42:27 +00:00
" Driver " : " /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so " ,
" Setup " : " /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so " ,
} ,
" MySQL " : {
" DSN " : " mysql_odbc " ,
" Driver " : " /usr/lib/x86_64-linux-gnu/odbc/libmyodbc.so " ,
" Database " : " clickhouse " ,
" Uid " : " root " ,
" Pwd " : " clickhouse " ,
" Server " : " mysql1 " ,
} ,
" PostgreSQL " : {
" DSN " : " postgresql_odbc " ,
2018-10-15 14:49:23 +00:00
" Database " : " postgres " ,
" UserName " : " postgres " ,
" Password " : " mysecretpassword " ,
" Port " : " 5432 " ,
" Servername " : " postgres1 " ,
" Protocol " : " 9.3 " ,
" ReadOnly " : " No " ,
" RowVersioning " : " No " ,
" ShowSystemTables " : " No " ,
2018-08-22 15:42:27 +00:00
" Driver " : " /usr/lib/x86_64-linux-gnu/odbc/psqlodbca.so " ,
" Setup " : " /usr/lib/x86_64-linux-gnu/odbc/libodbcpsqlS.so " ,
2018-10-15 14:49:23 +00:00
" ConnSettings " : " " ,
2018-08-22 15:42:27 +00:00
}
}
else :
return { }
def _create_odbc_config_file ( self ) :
with open ( self . odbc_ini_path . split ( ' : ' ) [ 0 ] , ' w ' ) as f :
2020-10-02 16:54:07 +00:00
for driver_setup in list ( self . odbc_drivers . values ( ) ) :
2018-08-22 15:42:27 +00:00
f . write ( " [ {} ] \n " . format ( driver_setup [ " DSN " ] ) )
2020-10-02 16:54:07 +00:00
for key , value in list ( driver_setup . items ( ) ) :
2018-08-22 15:42:27 +00:00
if key != " DSN " :
f . write ( key + " = " + value + " \n " )
2017-05-30 11:49:17 +00:00
2019-06-20 16:25:32 +00:00
def replace_config ( self , path_to_config , replacement ) :
self . exec_in_container ( [ " bash " , " -c " , " echo ' {} ' > {} " . format ( replacement , path_to_config ) ] )
2017-05-19 18:54:05 +00:00
def create_dir ( self , destroy_dir = True ) :
""" Create the instance directory and all the needed files there. """
if destroy_dir :
self . destroy_dir ( )
elif p . exists ( self . path ) :
return
2017-05-30 11:49:17 +00:00
os . makedirs ( self . path )
2017-05-19 18:54:05 +00:00
2020-08-12 08:55:04 +00:00
instance_config_dir = p . abspath ( p . join ( self . path , ' configs ' ) )
os . makedirs ( instance_config_dir )
2017-05-19 18:54:05 +00:00
2020-10-02 16:54:07 +00:00
print ( " Copy common default production configuration from {} " . format ( self . base_config_dir ) )
2020-08-12 08:55:04 +00:00
shutil . copyfile ( p . join ( self . base_config_dir , ' config.xml ' ) , p . join ( instance_config_dir , ' config.xml ' ) )
shutil . copyfile ( p . join ( self . base_config_dir , ' users.xml ' ) , p . join ( instance_config_dir , ' users.xml ' ) )
2017-05-19 18:54:05 +00:00
2020-10-02 16:54:07 +00:00
print ( " Create directory for configuration generated in this helper " )
2018-09-28 14:53:20 +00:00
# used by all utils with any config
2020-08-12 08:55:04 +00:00
conf_d_dir = p . abspath ( p . join ( instance_config_dir , ' conf.d ' ) )
2018-09-28 14:53:20 +00:00
os . mkdir ( conf_d_dir )
2020-08-12 08:55:04 +00:00
2020-10-02 16:54:07 +00:00
print ( " Create directory for common tests configuration " )
2020-08-12 08:55:04 +00:00
# used by server with main config.xml
self . config_d_dir = p . abspath ( p . join ( instance_config_dir , ' config.d ' ) )
2020-02-25 02:59:02 +00:00
os . mkdir ( self . config_d_dir )
2020-08-12 08:55:04 +00:00
users_d_dir = p . abspath ( p . join ( instance_config_dir , ' users.d ' ) )
2017-07-11 11:44:16 +00:00
os . mkdir ( users_d_dir )
2020-08-12 08:55:04 +00:00
dictionaries_dir = p . abspath ( p . join ( instance_config_dir , ' dictionaries ' ) )
os . mkdir ( dictionaries_dir )
2017-05-19 18:54:05 +00:00
2020-10-02 16:54:07 +00:00
print ( " Copy common configuration from helpers " )
2019-09-02 19:57:09 +00:00
# The file is named with 0_ prefix to be processed before other configuration overloads.
2020-02-25 02:59:02 +00:00
shutil . copy ( p . join ( HELPERS_DIR , ' 0_common_instance_config.xml ' ) , self . config_d_dir )
2020-04-08 00:50:27 +00:00
shutil . copy ( p . join ( HELPERS_DIR , ' 0_common_instance_users.xml ' ) , users_d_dir )
2020-08-12 08:55:04 +00:00
if len ( self . custom_dictionaries_paths ) :
shutil . copy ( p . join ( HELPERS_DIR , ' 0_common_enable_dictionaries.xml ' ) , self . config_d_dir )
2017-05-19 18:54:05 +00:00
2020-10-02 16:54:07 +00:00
print ( " Generate and write macros file " )
2018-07-25 16:00:51 +00:00
macros = self . macros . copy ( )
macros [ ' instance ' ] = self . name
2020-08-12 08:55:04 +00:00
with open ( p . join ( conf_d_dir , ' macros.xml ' ) , ' w ' ) as macros_config :
2019-11-20 11:56:38 +00:00
macros_config . write ( self . dict_to_xml ( { " macros " : macros } ) )
2017-05-19 18:54:05 +00:00
2017-05-30 11:49:17 +00:00
# Put ZooKeeper config
2017-05-19 18:54:05 +00:00
if self . with_zookeeper :
2018-09-28 14:53:20 +00:00
shutil . copy ( self . zookeeper_config_path , conf_d_dir )
2017-05-19 18:54:05 +00:00
2020-09-10 10:02:46 +00:00
if self . with_kerberized_kafka or self . with_kerberized_hdfs :
2020-09-29 08:56:37 +00:00
shutil . copytree ( self . kerberos_secrets_dir , p . abspath ( p . join ( self . path , ' secrets ' ) ) )
2017-05-30 11:49:17 +00:00
# Copy config.d configs
2020-10-02 16:54:07 +00:00
print ( " Copy custom test config files {} to {} " . format ( self . custom_main_config_paths , self . config_d_dir ) )
2017-05-30 11:49:17 +00:00
for path in self . custom_main_config_paths :
2020-02-25 02:59:02 +00:00
shutil . copy ( path , self . config_d_dir )
2017-05-19 18:54:05 +00:00
2017-05-30 11:49:17 +00:00
# Copy users.d configs
for path in self . custom_user_config_paths :
shutil . copy ( path , users_d_dir )
2020-08-12 08:55:04 +00:00
# Copy dictionaries configs to configs/dictionaries
for path in self . custom_dictionaries_paths :
shutil . copy ( path , dictionaries_dir )
2017-08-25 13:47:09 +00:00
db_dir = p . abspath ( p . join ( self . path , ' database ' ) )
2020-10-02 16:54:07 +00:00
print ( " Setup database dir {} " . format ( db_dir ) )
2017-06-15 20:08:26 +00:00
if self . clickhouse_path_dir is not None :
2020-10-02 16:54:07 +00:00
print ( " Database files taken from {} " . format ( self . clickhouse_path_dir ) )
2020-09-09 11:11:59 +00:00
shutil . copytree ( self . clickhouse_path_dir , db_dir )
2020-10-02 16:54:07 +00:00
print ( " Database copied from {} to {} " . format ( self . clickhouse_path_dir , db_dir ) )
2020-09-14 07:01:20 +00:00
else :
os . mkdir ( db_dir )
2017-05-19 18:54:05 +00:00
2017-08-25 13:47:09 +00:00
logs_dir = p . abspath ( p . join ( self . path , ' logs ' ) )
2020-10-02 16:54:07 +00:00
print ( " Setup logs dir {} " . format ( logs_dir ) )
2017-05-19 18:54:05 +00:00
os . mkdir ( logs_dir )
2018-05-14 11:10:07 +00:00
depends_on = [ ]
if self . with_mysql :
depends_on . append ( " mysql1 " )
2018-07-18 05:22:01 +00:00
if self . with_kafka :
depends_on . append ( " kafka1 " )
2020-02-03 00:02:19 +00:00
depends_on . append ( " schema-registry " )
2018-07-18 05:22:01 +00:00
2020-09-29 08:56:37 +00:00
if self . with_kerberized_kafka :
depends_on . append ( " kerberized_kafka1 " )
2020-09-10 10:02:46 +00:00
if self . with_kerberized_hdfs :
depends_on . append ( " kerberizedhdfs1 " )
2020-05-20 06:22:12 +00:00
if self . with_rabbitmq :
depends_on . append ( " rabbitmq1 " )
2017-05-19 18:54:05 +00:00
if self . with_zookeeper :
2018-05-14 11:10:07 +00:00
depends_on . append ( " zoo1 " )
depends_on . append ( " zoo2 " )
depends_on . append ( " zoo3 " )
2017-05-19 18:54:05 +00:00
2019-11-20 11:56:38 +00:00
if self . with_minio :
depends_on . append ( " minio1 " )
2018-07-28 14:38:08 +00:00
env_file = _create_env_file ( os . path . dirname ( self . docker_compose_path ) , self . env_variables )
2020-10-02 16:54:07 +00:00
print ( " Env {} stored in {} " . format ( self . env_variables , env_file ) )
2020-08-12 08:55:04 +00:00
2018-08-22 15:42:27 +00:00
odbc_ini_path = " "
if self . odbc_ini_path :
self . _create_odbc_config_file ( )
odbc_ini_path = ' - ' + self . odbc_ini_path
2018-11-22 15:59:00 +00:00
entrypoint_cmd = CLICKHOUSE_START_COMMAND
if self . stay_alive :
entrypoint_cmd = CLICKHOUSE_STAY_ALIVE_COMMAND
2020-10-02 16:54:07 +00:00
print ( " Entrypoint cmd: {} " . format ( entrypoint_cmd ) )
2020-08-12 08:55:04 +00:00
2019-12-18 13:42:39 +00:00
networks = app_net = ipv4_address = ipv6_address = net_aliases = net_alias1 = " "
if self . ipv4_address is not None or self . ipv6_address is not None or self . hostname != self . name :
2018-12-27 15:55:51 +00:00
networks = " networks: "
2019-06-04 20:59:31 +00:00
app_net = " default: "
2018-12-27 15:55:51 +00:00
if self . ipv4_address is not None :
ipv4_address = " ipv4_address: " + self . ipv4_address
if self . ipv6_address is not None :
ipv6_address = " ipv6_address: " + self . ipv6_address
2019-12-18 13:42:39 +00:00
if self . hostname != self . name :
net_aliases = " aliases: "
net_alias1 = " - " + self . hostname
2018-12-27 15:55:51 +00:00
2019-06-20 16:25:32 +00:00
if not self . with_installed_binary :
binary_volume = " - " + self . server_bin_path + " :/usr/bin/clickhouse "
odbc_bridge_volume = " - " + self . odbc_bridge_bin_path + " :/usr/bin/clickhouse-odbc-bridge "
else :
binary_volume = " - " + self . server_bin_path + " :/usr/share/clickhouse_fresh "
odbc_bridge_volume = " - " + self . odbc_bridge_bin_path + " :/usr/share/clickhouse-odbc-bridge_fresh "
2017-05-19 18:54:05 +00:00
with open ( self . docker_compose_path , ' w ' ) as docker_compose :
docker_compose . write ( DOCKER_COMPOSE_TEMPLATE . format (
2018-08-22 15:42:27 +00:00
image = self . image ,
2020-09-01 06:38:23 +00:00
tag = self . tag ,
2017-05-19 18:54:05 +00:00
name = self . name ,
2017-08-02 14:42:35 +00:00
hostname = self . hostname ,
2019-06-20 16:25:32 +00:00
binary_volume = binary_volume ,
odbc_bridge_volume = odbc_bridge_volume ,
2020-08-12 08:55:04 +00:00
instance_config_dir = instance_config_dir ,
2020-02-25 02:59:02 +00:00
config_d_dir = self . config_d_dir ,
2018-05-14 11:14:49 +00:00
db_dir = db_dir ,
2019-05-22 16:40:30 +00:00
tmpfs = str ( self . tmpfs ) ,
2017-05-19 18:54:05 +00:00
logs_dir = logs_dir ,
2018-07-28 14:38:08 +00:00
depends_on = str ( depends_on ) ,
2019-02-21 17:34:19 +00:00
user = os . getuid ( ) ,
2018-08-22 15:42:27 +00:00
env_file = env_file ,
odbc_ini_path = odbc_ini_path ,
2020-09-29 08:56:37 +00:00
keytab_path = self . keytab_path ,
krb5_conf = self . krb5_conf ,
2018-11-22 15:59:00 +00:00
entrypoint_cmd = entrypoint_cmd ,
2018-12-27 15:55:51 +00:00
networks = networks ,
app_net = app_net ,
ipv4_address = ipv4_address ,
ipv6_address = ipv6_address ,
2020-09-09 11:11:59 +00:00
net_aliases = net_aliases ,
net_alias1 = net_alias1 ,
2018-08-22 15:42:27 +00:00
) )
2017-05-19 18:54:05 +00:00
def destroy_dir ( self ) :
if p . exists ( self . path ) :
shutil . rmtree ( self . path )
2020-02-06 12:18:19 +00:00
class ClickHouseKiller ( object ) :
def __init__ ( self , clickhouse_node ) :
self . clickhouse_node = clickhouse_node
def __enter__ ( self ) :
2021-01-13 12:05:32 +00:00
self . clickhouse_node . stop_clickhouse ( kill = True )
2020-02-06 12:18:19 +00:00
def __exit__ ( self , exc_type , exc_val , exc_tb ) :
2021-01-11 17:37:08 +00:00
self . clickhouse_node . start_clickhouse ( )