2017-05-19 18:54:05 +00:00
import os
import os . path as p
2017-05-23 14:24:04 +00:00
import pwd
2017-05-19 18:54:05 +00:00
import re
import subprocess
import shutil
2017-05-30 11:49:17 +00:00
import distutils . dir_util
2017-05-19 18:54:05 +00:00
import socket
import time
import errno
2017-05-30 11:49:17 +00:00
from dicttoxml import dicttoxml
2018-08-22 15:42:27 +00:00
import pymysql
2017-05-30 11:49:17 +00:00
import xml . dom . minidom
2018-01-25 18:14:37 +00:00
from kazoo . client import KazooClient
from kazoo . exceptions import KazooException
2018-10-15 14:49:23 +00:00
import psycopg2
2018-12-05 13:24:45 +00:00
import requests
2019-02-21 17:34:19 +00:00
import base64
2019-02-25 10:45:22 +00:00
import pymongo
2019-03-30 18:40:52 +00:00
import urllib
2017-05-19 18:54:05 +00:00
import docker
2017-08-30 16:25:34 +00:00
from docker . errors import ContainerError
2017-05-19 18:54:05 +00:00
2017-07-24 20:12:59 +00:00
from . client import Client , CommandRequest
2018-12-05 13:24:45 +00:00
from . hdfs_api import HDFSApi
2017-05-19 18:54:05 +00:00
HELPERS_DIR = p . dirname ( __file__ )
2018-07-28 14:38:08 +00:00
DEFAULT_ENV_NAME = ' env_file '
2017-05-19 18:54:05 +00:00
2018-07-28 14:38:08 +00:00
def _create_env_file ( path , variables , fname = DEFAULT_ENV_NAME ) :
full_path = os . path . join ( path , fname )
with open ( full_path , ' w ' ) as f :
for var , value in variables . items ( ) :
f . write ( " = " . join ( [ var , value ] ) + " \n " )
return full_path
2018-09-28 14:53:20 +00:00
def subprocess_check_call ( args ) :
# Uncomment for debugging
# print('run:', ' ' . join(args))
subprocess . check_call ( args )
def subprocess_call ( args ) :
# Uncomment for debugging
# print('run:', ' ' . join(args))
subprocess . call ( args )
2019-01-29 17:17:31 +00:00
def get_odbc_bridge_path ( ) :
path = os . environ . get ( ' CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH ' )
if path is None :
server_path = os . environ . get ( ' CLICKHOUSE_TESTS_SERVER_BIN_PATH ' )
if server_path is not None :
return os . path . join ( os . path . dirname ( server_path ) , ' clickhouse-odbc-bridge ' )
else :
return ' /usr/bin/clickhouse-odbc-bridge '
return path
2017-05-19 18:54:05 +00:00
class ClickHouseCluster :
""" ClickHouse cluster with several instances and (possibly) ZooKeeper.
Add instances with several calls to add_instance ( ) , then start them with the start ( ) call .
Directories for instances are created in the directory of base_path . After cluster is started ,
these directories will contain logs , database files , docker - compose config , ClickHouse configs etc .
"""
2017-08-30 16:25:34 +00:00
def __init__ ( self , base_path , name = None , base_configs_dir = None , server_bin_path = None , client_bin_path = None ,
2019-01-29 17:17:31 +00:00
odbc_bridge_bin_path = None , zookeeper_config_path = None , custom_dockerd_host = None ) :
2017-05-19 18:54:05 +00:00
self . base_dir = p . dirname ( base_path )
2017-06-16 16:00:53 +00:00
self . name = name if name is not None else ' '
2017-05-19 18:54:05 +00:00
self . base_configs_dir = base_configs_dir or os . environ . get ( ' CLICKHOUSE_TESTS_BASE_CONFIG_DIR ' , ' /etc/clickhouse-server/ ' )
2018-09-28 14:53:20 +00:00
self . server_bin_path = p . realpath ( server_bin_path or os . environ . get ( ' CLICKHOUSE_TESTS_SERVER_BIN_PATH ' , ' /usr/bin/clickhouse ' ) )
2019-01-29 17:17:31 +00:00
self . odbc_bridge_bin_path = p . realpath ( odbc_bridge_bin_path or get_odbc_bridge_path ( ) )
2018-09-28 14:53:20 +00:00
self . client_bin_path = p . realpath ( client_bin_path or os . environ . get ( ' CLICKHOUSE_TESTS_CLIENT_BIN_PATH ' , ' /usr/bin/clickhouse-client ' ) )
2017-08-30 16:25:34 +00:00
self . zookeeper_config_path = p . join ( self . base_dir , zookeeper_config_path ) if zookeeper_config_path else p . join ( HELPERS_DIR , ' zookeeper_config.xml ' )
2017-05-19 18:54:05 +00:00
2017-06-16 16:00:53 +00:00
self . project_name = pwd . getpwuid ( os . getuid ( ) ) . pw_name + p . basename ( self . base_dir ) + self . name
2017-05-19 18:54:05 +00:00
# docker-compose removes everything non-alphanumeric from project names so we do it too.
self . project_name = re . sub ( r ' [^a-z0-9] ' , ' ' , self . project_name . lower ( ) )
2017-07-26 12:31:55 +00:00
self . instances_dir = p . join ( self . base_dir , ' _instances ' + ( ' ' if not self . name else ' _ ' + self . name ) )
2017-05-19 18:54:05 +00:00
2018-08-23 15:38:25 +00:00
custom_dockerd_host = custom_dockerd_host or os . environ . get ( ' CLICKHOUSE_TESTS_DOCKERD_HOST ' )
2018-08-24 11:19:06 +00:00
self . docker_api_version = os . environ . get ( " DOCKER_API_VERSION " )
2018-08-23 15:31:20 +00:00
self . base_cmd = [ ' docker-compose ' ]
if custom_dockerd_host :
self . base_cmd + = [ ' --host ' , custom_dockerd_host ]
self . base_cmd + = [ ' --project-directory ' , self . base_dir , ' --project-name ' , self . project_name ]
2017-08-30 16:25:34 +00:00
self . base_zookeeper_cmd = None
2018-05-14 11:10:07 +00:00
self . base_mysql_cmd = [ ]
2018-07-18 05:22:01 +00:00
self . base_kafka_cmd = [ ]
2018-01-09 19:12:43 +00:00
self . pre_zookeeper_commands = [ ]
2017-05-19 18:54:05 +00:00
self . instances = { }
self . with_zookeeper = False
2018-05-14 11:10:07 +00:00
self . with_mysql = False
2018-10-15 14:49:23 +00:00
self . with_postgres = False
2018-07-18 05:22:01 +00:00
self . with_kafka = False
2018-08-22 15:42:27 +00:00
self . with_odbc_drivers = False
2018-12-05 13:24:45 +00:00
self . with_hdfs = False
2019-02-25 10:45:22 +00:00
self . with_mongo = False
2019-06-05 09:23:41 +00:00
self . with_net_trics = False
2018-07-28 14:38:08 +00:00
2017-05-23 17:13:36 +00:00
self . docker_client = None
2017-05-19 18:54:05 +00:00
self . is_up = False
2018-09-07 11:51:51 +00:00
def get_client_cmd ( self ) :
cmd = self . client_bin_path
if p . basename ( cmd ) == ' clickhouse ' :
cmd + = " client "
return cmd
2019-02-25 10:45:22 +00:00
def add_instance ( self , name , config_dir = None , main_configs = [ ] , user_configs = [ ] , macros = { } , with_zookeeper = False , with_mysql = False , with_kafka = False , clickhouse_path_dir = None , with_odbc_drivers = False , with_postgres = False , with_hdfs = False , with_mongo = False , hostname = None , env_variables = { } , image = " yandex/clickhouse-integration-test " , stay_alive = False , ipv4_address = None , ipv6_address = None ) :
2017-05-19 18:54:05 +00:00
""" Add an instance to the cluster.
name - the name of the instance directory and the value of the ' instance ' macro in ClickHouse .
2017-05-30 11:49:17 +00:00
config_dir - a directory with config files which content will be copied to / etc / clickhouse - server / directory
main_configs - a list of config files that will be added to config . d / directory
user_configs - a list of config files that will be added to users . d / directory
2017-05-19 18:54:05 +00:00
with_zookeeper - if True , add ZooKeeper configuration to configs and ZooKeeper instances to the cluster .
"""
if self . is_up :
2017-05-23 17:13:36 +00:00
raise Exception ( " Can \' t add instance %s : cluster is already up! " % name )
2017-05-19 18:54:05 +00:00
if name in self . instances :
2017-05-23 17:13:36 +00:00
raise Exception ( " Can \' t add instance ` %s ' : there is already an instance with the same name! " % name )
2017-05-19 18:54:05 +00:00
2017-08-02 14:42:35 +00:00
instance = ClickHouseInstance (
2018-07-25 16:00:51 +00:00
self , self . base_dir , name , config_dir , main_configs , user_configs , macros , with_zookeeper ,
2019-02-25 10:45:22 +00:00
self . zookeeper_config_path , with_mysql , with_kafka , with_mongo , self . base_configs_dir , self . server_bin_path ,
2019-01-29 17:17:31 +00:00
self . odbc_bridge_bin_path , clickhouse_path_dir , with_odbc_drivers , hostname = hostname ,
env_variables = env_variables , image = image , stay_alive = stay_alive , ipv4_address = ipv4_address , ipv6_address = ipv6_address )
2017-08-02 14:42:35 +00:00
2017-05-19 18:54:05 +00:00
self . instances [ name ] = instance
2019-06-04 20:59:31 +00:00
if ipv4_address is not None or ipv6_address is not None :
2019-06-05 09:23:41 +00:00
self . with_net_trics = True
2019-06-04 20:59:31 +00:00
self . base_cmd . extend ( [ ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_net.yml ' ) ] )
2017-05-19 18:54:05 +00:00
self . base_cmd . extend ( [ ' --file ' , instance . docker_compose_path ] )
2019-06-04 20:59:31 +00:00
2019-06-05 09:23:41 +00:00
cmds = [ ]
2017-05-19 18:54:05 +00:00
if with_zookeeper and not self . with_zookeeper :
self . with_zookeeper = True
self . base_cmd . extend ( [ ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_zookeeper.yml ' ) ] )
2019-06-05 09:23:41 +00:00
self . base_zookeeper_cmd = [ ' docker-compose ' , ' --project-directory ' , self . base_dir , ' --project-name ' ,
self . project_name , ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_zookeeper.yml ' ) ]
cmds . append ( self . base_zookeeper_cmd )
2018-07-28 14:38:08 +00:00
2018-05-14 11:10:07 +00:00
if with_mysql and not self . with_mysql :
self . with_mysql = True
self . base_cmd . extend ( [ ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_mysql.yml ' ) ] )
self . base_mysql_cmd = [ ' docker-compose ' , ' --project-directory ' , self . base_dir , ' --project-name ' ,
self . project_name , ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_mysql.yml ' ) ]
2017-05-19 18:54:05 +00:00
2019-06-05 09:23:41 +00:00
cmds . append ( self . base_mysql_cmd )
2018-10-15 14:49:23 +00:00
if with_postgres and not self . with_postgres :
self . with_postgres = True
self . base_cmd . extend ( [ ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_postgres.yml ' ) ] )
self . base_postgres_cmd = [ ' docker-compose ' , ' --project-directory ' , self . base_dir , ' --project-name ' ,
self . project_name , ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_postgres.yml ' ) ]
2019-06-05 09:23:41 +00:00
cmds . append ( self . base_postgres_cmd )
2018-10-15 14:49:23 +00:00
2018-08-22 15:42:27 +00:00
if with_odbc_drivers and not self . with_odbc_drivers :
self . with_odbc_drivers = True
if not self . with_mysql :
self . with_mysql = True
self . base_cmd . extend ( [ ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_mysql.yml ' ) ] )
self . base_mysql_cmd = [ ' docker-compose ' , ' --project-directory ' , self . base_dir , ' --project-name ' ,
self . project_name , ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_mysql.yml ' ) ]
2019-06-05 09:23:41 +00:00
cmds . append ( self . base_mysql_cmd )
2018-10-15 14:49:23 +00:00
if not self . with_postgres :
self . with_postgres = True
self . base_cmd . extend ( [ ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_postgres.yml ' ) ] )
self . base_postgres_cmd = [ ' docker-compose ' , ' --project-directory ' , self . base_dir , ' --project-name ' ,
self . project_name , ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_postgres.yml ' ) ]
2019-06-05 09:23:41 +00:00
cmds . append ( self . base_postgres_cmd )
2018-10-15 14:49:23 +00:00
2018-07-18 05:22:01 +00:00
if with_kafka and not self . with_kafka :
self . with_kafka = True
self . base_cmd . extend ( [ ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_kafka.yml ' ) ] )
self . base_kafka_cmd = [ ' docker-compose ' , ' --project-directory ' , self . base_dir , ' --project-name ' ,
self . project_name , ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_kafka.yml ' ) ]
2019-06-05 09:23:41 +00:00
cmds . append ( self . base_kafka_cmd )
2018-07-18 05:22:01 +00:00
2018-12-05 13:24:45 +00:00
if with_hdfs and not self . with_hdfs :
self . with_hdfs = True
self . base_cmd . extend ( [ ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_hdfs.yml ' ) ] )
self . base_hdfs_cmd = [ ' docker-compose ' , ' --project-directory ' , self . base_dir , ' --project-name ' ,
self . project_name , ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_hdfs.yml ' ) ]
2019-06-05 09:23:41 +00:00
cmds . append ( self . base_hdfs_cmd )
2018-12-05 13:24:45 +00:00
2019-02-25 10:45:22 +00:00
if with_mongo and not self . with_mongo :
self . with_mongo = True
self . base_cmd . extend ( [ ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_mongo.yml ' ) ] )
self . base_mongo_cmd = [ ' docker-compose ' , ' --project-directory ' , self . base_dir , ' --project-name ' ,
self . project_name , ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_mongo.yml ' ) ]
2019-06-05 09:23:41 +00:00
cmds . append ( self . base_mongo_cmd )
if self . with_net_trics :
for cmd in cmds :
cmd . extend ( [ ' --file ' , p . join ( HELPERS_DIR , ' docker_compose_net.yml ' ) ] )
2018-12-05 13:24:45 +00:00
2017-05-19 18:54:05 +00:00
return instance
2017-05-30 11:49:17 +00:00
def get_instance_docker_id ( self , instance_name ) :
# According to how docker-compose names containers.
return self . project_name + ' _ ' + instance_name + ' _1 '
2019-06-04 20:59:31 +00:00
def _replace ( self , path , what , to ) :
with open ( path , ' r ' ) as p :
data = p . read ( )
data = data . replace ( what , to )
with open ( path , ' w ' ) as p :
p . write ( data )
def restart_instance_with_ip_change ( self , node , new_ip ) :
if ' :: ' in new_ip :
if node . ipv6_address is None :
raise Exception ( " You shoud specity ipv6_address in add_node method " )
self . _replace ( node . docker_compose_path , node . ipv6_address , new_ip )
node . ipv6_address = new_ip
else :
if node . ipv4_address is None :
raise Exception ( " You shoud specity ipv4_address in add_node method " )
self . _replace ( node . docker_compose_path , node . ipv4_address , new_ip )
node . ipv4_address = new_ip
subprocess . check_call ( self . base_cmd + [ " stop " , node . name ] )
subprocess . check_call ( self . base_cmd + [ " rm " , " --force " , " --stop " , node . name ] )
subprocess . check_call ( self . base_cmd + [ " up " , " --force-recreate " , " --no-deps " , " -d " , node . name ] )
node . ip_address = self . get_instance_ip ( node . name )
node . client = Client ( node . ip_address , command = self . client_bin_path )
start_deadline = time . time ( ) + 20.0 # seconds
node . wait_for_start ( start_deadline )
return node
2017-05-30 11:49:17 +00:00
2018-01-25 18:14:37 +00:00
def get_instance_ip ( self , instance_name ) :
docker_id = self . get_instance_docker_id ( instance_name )
handle = self . docker_client . containers . get ( docker_id )
return handle . attrs [ ' NetworkSettings ' ] [ ' Networks ' ] . values ( ) [ 0 ] [ ' IPAddress ' ]
2018-08-22 15:42:27 +00:00
def wait_mysql_to_start ( self , timeout = 60 ) :
start = time . time ( )
while time . time ( ) - start < timeout :
try :
conn = pymysql . connect ( user = ' root ' , password = ' clickhouse ' , host = ' 127.0.0.1 ' , port = 3308 )
conn . close ( )
print " Mysql Started "
return
2018-08-27 14:45:37 +00:00
except Exception as ex :
2018-09-03 14:06:00 +00:00
print " Can ' t connect to MySQL " + str ( ex )
2018-08-22 15:42:27 +00:00
time . sleep ( 0.5 )
raise Exception ( " Cannot wait MySQL container " )
2018-10-15 14:49:23 +00:00
def wait_postgres_to_start ( self , timeout = 60 ) :
start = time . time ( )
while time . time ( ) - start < timeout :
try :
conn_string = " host= ' localhost ' user= ' postgres ' password= ' mysecretpassword ' "
conn = psycopg2 . connect ( conn_string )
conn . close ( )
print " Postgres Started "
return
except Exception as ex :
print " Can ' t connect to Postgres " + str ( ex )
time . sleep ( 0.5 )
raise Exception ( " Cannot wait Postgres container " )
2018-08-27 13:42:39 +00:00
def wait_zookeeper_to_start ( self , timeout = 60 ) :
start = time . time ( )
while time . time ( ) - start < timeout :
try :
for instance in [ ' zoo1 ' , ' zoo2 ' , ' zoo3 ' ] :
conn = self . get_kazoo_client ( instance )
conn . get_children ( ' / ' )
2018-08-27 14:43:59 +00:00
print " All instances of ZooKeeper started "
2018-08-27 13:42:39 +00:00
return
2018-08-27 14:45:37 +00:00
except Exception as ex :
2018-09-03 14:06:00 +00:00
print " Can ' t connect to ZooKeeper " + str ( ex )
2018-08-27 13:42:39 +00:00
time . sleep ( 0.5 )
raise Exception ( " Cannot wait ZooKeeper container " )
2018-01-25 18:14:37 +00:00
2018-12-05 13:24:45 +00:00
def wait_hdfs_to_start ( self , timeout = 60 ) :
hdfs_api = HDFSApi ( " root " )
start = time . time ( )
while time . time ( ) - start < timeout :
try :
hdfs_api . write_data ( " /somefilewithrandomname222 " , " 1 " )
print " Connected to HDFS and SafeMode disabled! "
return
except Exception as ex :
print " Can ' t connect to HDFS " + str ( ex )
time . sleep ( 1 )
raise Exception ( " Can ' t wait HDFS to start " )
2019-02-25 10:45:22 +00:00
def wait_mongo_to_start ( self , timeout = 30 ) :
connection_str = ' mongodb:// {user} : {password} @ {host} : {port} ' . format (
host = ' localhost ' , port = ' 27018 ' , user = ' root ' , password = ' clickhouse ' )
connection = pymongo . MongoClient ( connection_str )
start = time . time ( )
while time . time ( ) - start < timeout :
try :
connection . database_names ( )
print " Connected to Mongo dbs: " , connection . database_names ( )
return
except Exception as ex :
print " Can ' t connect to Mongo " + str ( ex )
time . sleep ( 1 )
2017-05-19 18:54:05 +00:00
def start ( self , destroy_dirs = True ) :
if self . is_up :
return
2017-08-14 11:49:30 +00:00
# Just in case kill unstopped containers from previous launch
2017-07-27 22:29:48 +00:00
try :
2018-09-28 14:53:20 +00:00
if not subprocess_call ( [ ' docker-compose ' , ' kill ' ] ) :
subprocess_call ( [ ' docker-compose ' , ' down ' , ' --volumes ' ] )
2017-07-27 22:29:48 +00:00
except :
pass
2017-07-26 14:15:16 +00:00
if destroy_dirs and p . exists ( self . instances_dir ) :
print " Removing instances dir " , self . instances_dir
2017-07-26 12:31:55 +00:00
shutil . rmtree ( self . instances_dir )
2017-05-19 18:54:05 +00:00
for instance in self . instances . values ( ) :
instance . create_dir ( destroy_dir = destroy_dirs )
2018-08-24 10:31:19 +00:00
self . docker_client = docker . from_env ( version = self . docker_api_version )
2017-05-23 17:13:36 +00:00
2017-08-30 16:25:34 +00:00
if self . with_zookeeper and self . base_zookeeper_cmd :
2018-10-15 14:49:23 +00:00
subprocess_check_call ( self . base_zookeeper_cmd + [ ' up ' , ' -d ' , ' --force-recreate ' ] )
2018-01-09 19:12:43 +00:00
for command in self . pre_zookeeper_commands :
2018-01-25 18:14:37 +00:00
self . run_kazoo_commands_with_retries ( command , repeats = 5 )
2018-09-06 13:03:42 +00:00
self . wait_zookeeper_to_start ( 120 )
2017-08-30 16:25:34 +00:00
2018-05-14 11:10:07 +00:00
if self . with_mysql and self . base_mysql_cmd :
2018-10-15 14:49:23 +00:00
subprocess_check_call ( self . base_mysql_cmd + [ ' up ' , ' -d ' , ' --force-recreate ' ] )
2018-08-27 14:43:59 +00:00
self . wait_mysql_to_start ( 120 )
2018-05-14 11:10:07 +00:00
2018-10-15 14:49:23 +00:00
if self . with_postgres and self . base_postgres_cmd :
subprocess_check_call ( self . base_postgres_cmd + [ ' up ' , ' -d ' , ' --force-recreate ' ] )
self . wait_postgres_to_start ( 120 )
2018-07-18 05:22:01 +00:00
if self . with_kafka and self . base_kafka_cmd :
2018-10-15 14:49:23 +00:00
subprocess_check_call ( self . base_kafka_cmd + [ ' up ' , ' -d ' , ' --force-recreate ' ] )
2018-07-26 04:36:28 +00:00
self . kafka_docker_id = self . get_instance_docker_id ( ' kafka1 ' )
2018-07-18 05:22:01 +00:00
2018-12-05 13:24:45 +00:00
if self . with_hdfs and self . base_hdfs_cmd :
subprocess_check_call ( self . base_hdfs_cmd + [ ' up ' , ' -d ' , ' --force-recreate ' ] )
self . wait_hdfs_to_start ( 120 )
2019-02-25 10:45:22 +00:00
if self . with_mongo and self . base_mongo_cmd :
subprocess_check_call ( self . base_mongo_cmd + [ ' up ' , ' -d ' , ' --force-recreate ' ] )
self . wait_mongo_to_start ( 30 )
2018-12-05 13:24:45 +00:00
subprocess_check_call ( self . base_cmd + [ ' up ' , ' -d ' , ' --no-recreate ' ] )
2017-08-30 16:25:34 +00:00
2017-05-30 11:49:17 +00:00
start_deadline = time . time ( ) + 20.0 # seconds
for instance in self . instances . itervalues ( ) :
2017-05-23 17:13:36 +00:00
instance . docker_client = self . docker_client
2018-01-25 18:14:37 +00:00
instance . ip_address = self . get_instance_ip ( instance . name )
2017-05-19 18:54:05 +00:00
2017-05-23 17:13:36 +00:00
instance . wait_for_start ( start_deadline )
2017-05-19 18:54:05 +00:00
instance . client = Client ( instance . ip_address , command = self . client_bin_path )
self . is_up = True
def shutdown ( self , kill = True ) :
if kill :
2018-09-28 14:53:20 +00:00
subprocess_check_call ( self . base_cmd + [ ' kill ' ] )
subprocess_check_call ( self . base_cmd + [ ' down ' , ' --volumes ' , ' --remove-orphans ' ] )
2017-05-19 18:54:05 +00:00
self . is_up = False
2017-05-23 17:13:36 +00:00
self . docker_client = None
2017-05-19 18:54:05 +00:00
for instance in self . instances . values ( ) :
2017-05-23 17:13:36 +00:00
instance . docker_client = None
2017-05-19 18:54:05 +00:00
instance . ip_address = None
instance . client = None
2018-01-25 18:14:37 +00:00
def get_kazoo_client ( self , zoo_instance_name ) :
zk = KazooClient ( hosts = self . get_instance_ip ( zoo_instance_name ) )
zk . start ( )
return zk
def run_kazoo_commands_with_retries ( self , kazoo_callback , zoo_instance_name = ' zoo1 ' , repeats = 1 , sleep_for = 1 ) :
for i in range ( repeats - 1 ) :
2017-08-30 16:25:34 +00:00
try :
2018-01-25 18:14:37 +00:00
kazoo_callback ( self . get_kazoo_client ( zoo_instance_name ) )
return
except KazooException as e :
print repr ( e )
2017-08-30 16:25:34 +00:00
time . sleep ( sleep_for )
2018-01-25 18:14:37 +00:00
kazoo_callback ( self . get_kazoo_client ( zoo_instance_name ) )
2017-08-30 16:25:34 +00:00
def add_zookeeper_startup_command ( self , command ) :
2018-01-09 19:12:43 +00:00
self . pre_zookeeper_commands . append ( command )
2017-08-30 16:25:34 +00:00
2018-11-22 15:59:00 +00:00
CLICKHOUSE_START_COMMAND = " clickhouse server --config-file=/etc/clickhouse-server/config.xml --log-file=/var/log/clickhouse-server/clickhouse-server.log --errorlog-file=/var/log/clickhouse-server/clickhouse-server.err.log "
CLICKHOUSE_STAY_ALIVE_COMMAND = ' bash -c " {} --daemon; tail -f /dev/null " ' . format ( CLICKHOUSE_START_COMMAND )
2017-05-19 18:54:05 +00:00
DOCKER_COMPOSE_TEMPLATE = '''
2018-12-27 15:55:51 +00:00
version : ' 2.2 '
2017-05-19 18:54:05 +00:00
services :
{ name } :
2018-08-22 15:42:27 +00:00
image : { image }
2017-08-02 14:42:35 +00:00
hostname : { hostname }
2017-05-19 18:54:05 +00:00
volumes :
- { binary_path } : / usr / bin / clickhouse : ro
2019-01-29 17:17:31 +00:00
- { odbc_bridge_bin_path } : / usr / bin / clickhouse - odbc - bridge : ro
2017-05-19 18:54:05 +00:00
- { configs_dir } : / etc / clickhouse - server /
- { db_dir } : / var / lib / clickhouse /
- { logs_dir } : / var / log / clickhouse - server /
2018-08-22 15:42:27 +00:00
{ odbc_ini_path }
2018-11-22 15:59:00 +00:00
entrypoint : { entrypoint_cmd }
2018-12-27 19:42:25 +00:00
cap_add :
- SYS_PTRACE
2017-05-19 18:54:05 +00:00
depends_on : { depends_on }
2019-02-21 17:34:19 +00:00
user : ' {user} '
2018-07-28 14:38:08 +00:00
env_file :
- { env_file }
2019-02-21 17:34:19 +00:00
security_opt :
- label : disable
2018-12-27 15:55:51 +00:00
{ networks }
{ app_net }
{ ipv4_address }
{ ipv6_address }
2017-05-19 18:54:05 +00:00
'''
2019-06-04 20:59:31 +00:00
2017-05-19 18:54:05 +00:00
class ClickHouseInstance :
2018-08-22 15:42:27 +00:00
2017-05-19 18:54:05 +00:00
def __init__ (
2018-07-25 16:00:51 +00:00
self , cluster , base_path , name , custom_config_dir , custom_main_configs , custom_user_configs , macros ,
2019-02-25 10:45:22 +00:00
with_zookeeper , zookeeper_config_path , with_mysql , with_kafka , with_mongo , base_configs_dir , server_bin_path , odbc_bridge_bin_path ,
2019-02-21 17:34:19 +00:00
clickhouse_path_dir , with_odbc_drivers , hostname = None , env_variables = { } , image = " yandex/clickhouse-integration-test " ,
2018-12-27 15:55:51 +00:00
stay_alive = False , ipv4_address = None , ipv6_address = None ) :
2017-05-19 18:54:05 +00:00
self . name = name
2017-05-30 11:49:17 +00:00
self . base_cmd = cluster . base_cmd [ : ]
self . docker_id = cluster . get_instance_docker_id ( self . name )
self . cluster = cluster
2017-08-02 14:42:35 +00:00
self . hostname = hostname if hostname is not None else self . name
2017-05-30 11:49:17 +00:00
self . custom_config_dir = p . abspath ( p . join ( base_path , custom_config_dir ) ) if custom_config_dir else None
self . custom_main_config_paths = [ p . abspath ( p . join ( base_path , c ) ) for c in custom_main_configs ]
self . custom_user_config_paths = [ p . abspath ( p . join ( base_path , c ) ) for c in custom_user_configs ]
2017-06-15 20:08:26 +00:00
self . clickhouse_path_dir = p . abspath ( p . join ( base_path , clickhouse_path_dir ) ) if clickhouse_path_dir else None
2018-07-25 16:00:51 +00:00
self . macros = macros if macros is not None else { }
2017-05-19 18:54:05 +00:00
self . with_zookeeper = with_zookeeper
2017-08-30 16:25:34 +00:00
self . zookeeper_config_path = zookeeper_config_path
2017-05-19 18:54:05 +00:00
self . base_configs_dir = base_configs_dir
self . server_bin_path = server_bin_path
2019-01-29 17:17:31 +00:00
self . odbc_bridge_bin_path = odbc_bridge_bin_path
2017-05-19 18:54:05 +00:00
2018-05-14 11:10:07 +00:00
self . with_mysql = with_mysql
2018-07-18 05:22:01 +00:00
self . with_kafka = with_kafka
2019-02-25 10:45:22 +00:00
self . with_mongo = with_mongo
2018-05-14 11:10:07 +00:00
2017-07-26 12:31:55 +00:00
self . path = p . join ( self . cluster . instances_dir , name )
2017-05-19 18:54:05 +00:00
self . docker_compose_path = p . join ( self . path , ' docker_compose.yml ' )
2018-07-28 14:38:08 +00:00
self . env_variables = env_variables
2018-08-22 15:42:27 +00:00
if with_odbc_drivers :
self . odbc_ini_path = os . path . dirname ( self . docker_compose_path ) + " /odbc.ini:/etc/odbc.ini "
self . with_mysql = True
else :
self . odbc_ini_path = " "
2017-05-19 18:54:05 +00:00
2017-05-23 17:13:36 +00:00
self . docker_client = None
2017-05-19 18:54:05 +00:00
self . ip_address = None
self . client = None
2017-05-30 11:49:17 +00:00
self . default_timeout = 20.0 # 20 sec
2018-08-22 15:42:27 +00:00
self . image = image
2018-11-22 15:59:00 +00:00
self . stay_alive = stay_alive
2018-12-27 15:55:51 +00:00
self . ipv4_address = ipv4_address
self . ipv6_address = ipv6_address
2017-05-30 11:49:17 +00:00
2017-08-14 01:29:19 +00:00
# Connects to the instance via clickhouse-client, sends a query (1st argument) and returns the answer
2018-09-03 14:06:00 +00:00
def query ( self , sql , stdin = None , timeout = None , settings = None , user = None , ignore_error = False ) :
return self . client . query ( sql , stdin , timeout , settings , user , ignore_error )
def query_with_retry ( self , sql , stdin = None , timeout = None , settings = None , user = None , ignore_error = False , retry_count = 20 , sleep_time = 0.5 , check_callback = lambda x : True ) :
result = None
for i in range ( retry_count ) :
try :
result = self . query ( sql , stdin , timeout , settings , user , ignore_error )
if check_callback ( result ) :
return result
time . sleep ( sleep_time )
except Exception as ex :
print " Retry {} got exception {} " . format ( i + 1 , ex )
time . sleep ( sleep_time )
if result is not None :
return result
raise Exception ( " Can ' t execute query {} " . format ( sql ) )
2017-05-30 11:49:17 +00:00
2017-07-26 12:31:55 +00:00
# As query() but doesn't wait response and returns response handler
2017-05-30 11:49:17 +00:00
def get_query_request ( self , * args , * * kwargs ) :
return self . client . get_query_request ( * args , * * kwargs )
2019-04-07 00:31:20 +00:00
# Connects to the instance via clickhouse-client, sends a query (1st argument), expects an error and return its code
def query_and_get_error ( self , sql , stdin = None , timeout = None , settings = None , user = None ) :
return self . client . query_and_get_error ( sql , stdin , timeout , settings , user )
2019-03-30 18:40:52 +00:00
# Connects to the instance via HTTP interface, sends a query and returns the answer
def http_query ( self , sql , data = None ) :
return urllib . urlopen ( " http:// " + self . ip_address + " :8123/?query= " + urllib . quote ( sql , safe = ' ' ) , data ) . read ( )
2019-03-14 13:39:47 +00:00
def restart_clickhouse ( self , stop_start_wait_sec = 5 ) :
if not self . stay_alive :
raise Exception ( " clickhouse can be restarted only with stay_alive=True instance " )
self . exec_in_container ( [ " bash " , " -c " , " pkill clickhouse " ] , user = ' root ' )
time . sleep ( stop_start_wait_sec )
self . exec_in_container ( [ " bash " , " -c " , " {} --daemon " . format ( CLICKHOUSE_START_COMMAND ) ] , user = ' root ' )
2017-05-19 18:54:05 +00:00
2019-02-22 10:55:12 +00:00
def exec_in_container ( self , cmd , detach = False , * * kwargs ) :
2017-08-02 14:42:35 +00:00
container = self . get_docker_handle ( )
2018-01-09 19:12:43 +00:00
exec_id = self . docker_client . api . exec_create ( container . id , cmd , * * kwargs )
2019-02-22 10:55:12 +00:00
output = self . docker_client . api . exec_start ( exec_id , detach = detach )
2018-01-09 19:12:43 +00:00
output = output . decode ( ' utf8 ' )
exit_code = self . docker_client . api . exec_inspect ( exec_id ) [ ' ExitCode ' ]
2017-08-02 14:42:35 +00:00
if exit_code :
2018-01-09 19:12:43 +00:00
raise Exception ( ' Cmd " {} " failed! Return code {} . Output: {} ' . format ( ' ' . join ( cmd ) , exit_code , output ) )
2017-08-02 14:42:35 +00:00
return output
2019-03-29 18:10:03 +00:00
def contains_in_log ( self , substring ) :
result = self . exec_in_container ( [ " bash " , " -c " , " grep ' {} ' /var/log/clickhouse-server/clickhouse-server.log || true " . format ( substring ) ] )
return len ( result ) > 0
2019-02-21 17:34:19 +00:00
def copy_file_to_container ( self , local_path , dest_path ) :
with open ( local_path , ' r ' ) as fdata :
data = fdata . read ( )
encoded_data = base64 . b64encode ( data )
self . exec_in_container ( [ " bash " , " -c " , " echo {} | base64 --decode > {} " . format ( encoded_data , dest_path ) ] )
2017-08-02 14:42:35 +00:00
2017-05-30 11:49:17 +00:00
def get_docker_handle ( self ) :
return self . docker_client . containers . get ( self . docker_id )
2017-05-19 18:54:05 +00:00
2017-05-30 11:49:17 +00:00
def stop ( self ) :
2019-06-04 20:59:31 +00:00
self . get_docker_handle ( ) . stop ( )
2017-05-30 11:49:17 +00:00
def start ( self ) :
self . get_docker_handle ( ) . start ( )
def wait_for_start ( self , deadline = None , timeout = None ) :
start_time = time . time ( )
if timeout is not None :
deadline = start_time + timeout
2017-05-19 18:54:05 +00:00
while True :
2018-07-18 05:22:01 +00:00
handle = self . get_docker_handle ( )
status = handle . status ;
2017-05-30 11:49:17 +00:00
if status == ' exited ' :
2018-07-18 05:22:01 +00:00
raise Exception ( " Instance ` {} ' failed to start. Container status: {} , logs: {} " . format ( self . name , status , handle . logs ( ) ) )
2017-05-30 11:49:17 +00:00
current_time = time . time ( )
time_left = deadline - current_time
if deadline is not None and current_time > = deadline :
raise Exception ( " Timed out while waiting for instance ` {} ' with ip address {} to start. "
" Container status: {} " . format ( self . name , self . ip_address , status ) )
2017-05-19 18:54:05 +00:00
# Repeatedly poll the instance address until there is something that listens there.
# Usually it means that ClickHouse is ready to accept queries.
try :
sock = socket . socket ( socket . AF_INET , socket . SOCK_STREAM )
2017-05-23 17:13:36 +00:00
sock . settimeout ( time_left )
2017-05-19 18:54:05 +00:00
sock . connect ( ( self . ip_address , 9000 ) )
return
2017-05-23 17:13:36 +00:00
except socket . timeout :
continue
2017-05-19 18:54:05 +00:00
except socket . error as e :
if e . errno == errno . ECONNREFUSED :
time . sleep ( 0.1 )
else :
raise
finally :
sock . close ( )
2017-05-30 11:49:17 +00:00
@staticmethod
def dict_to_xml ( dictionary ) :
xml_str = dicttoxml ( dictionary , custom_root = " yandex " , attr_type = False )
return xml . dom . minidom . parseString ( xml_str ) . toprettyxml ( )
2018-08-22 15:42:27 +00:00
@property
def odbc_drivers ( self ) :
if self . odbc_ini_path :
return {
" SQLite3 " : {
" DSN " : " sqlite3_odbc " ,
" Database " : " /tmp/sqliteodbc " ,
" Driver " : " /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so " ,
" Setup " : " /usr/lib/x86_64-linux-gnu/odbc/libsqlite3odbc.so " ,
} ,
" MySQL " : {
" DSN " : " mysql_odbc " ,
" Driver " : " /usr/lib/x86_64-linux-gnu/odbc/libmyodbc.so " ,
" Database " : " clickhouse " ,
" Uid " : " root " ,
" Pwd " : " clickhouse " ,
" Server " : " mysql1 " ,
} ,
" PostgreSQL " : {
" DSN " : " postgresql_odbc " ,
2018-10-15 14:49:23 +00:00
" Database " : " postgres " ,
" UserName " : " postgres " ,
" Password " : " mysecretpassword " ,
" Port " : " 5432 " ,
" Servername " : " postgres1 " ,
" Protocol " : " 9.3 " ,
" ReadOnly " : " No " ,
" RowVersioning " : " No " ,
" ShowSystemTables " : " No " ,
2018-08-22 15:42:27 +00:00
" Driver " : " /usr/lib/x86_64-linux-gnu/odbc/psqlodbca.so " ,
" Setup " : " /usr/lib/x86_64-linux-gnu/odbc/libodbcpsqlS.so " ,
2018-10-15 14:49:23 +00:00
" ConnSettings " : " " ,
2018-08-22 15:42:27 +00:00
}
}
else :
return { }
def _create_odbc_config_file ( self ) :
with open ( self . odbc_ini_path . split ( ' : ' ) [ 0 ] , ' w ' ) as f :
for driver_setup in self . odbc_drivers . values ( ) :
f . write ( " [ {} ] \n " . format ( driver_setup [ " DSN " ] ) )
for key , value in driver_setup . items ( ) :
if key != " DSN " :
f . write ( key + " = " + value + " \n " )
2017-05-30 11:49:17 +00:00
2017-05-19 18:54:05 +00:00
def create_dir ( self , destroy_dir = True ) :
""" Create the instance directory and all the needed files there. """
if destroy_dir :
self . destroy_dir ( )
elif p . exists ( self . path ) :
return
2017-05-30 11:49:17 +00:00
os . makedirs ( self . path )
2017-05-19 18:54:05 +00:00
2017-08-25 13:47:09 +00:00
configs_dir = p . abspath ( p . join ( self . path , ' configs ' ) )
2017-05-19 18:54:05 +00:00
os . mkdir ( configs_dir )
shutil . copy ( p . join ( self . base_configs_dir , ' config.xml ' ) , configs_dir )
shutil . copy ( p . join ( self . base_configs_dir , ' users.xml ' ) , configs_dir )
2018-09-28 14:53:20 +00:00
# used by all utils with any config
conf_d_dir = p . abspath ( p . join ( configs_dir , ' conf.d ' ) )
# used by server with main config.xml
2017-08-25 13:47:09 +00:00
config_d_dir = p . abspath ( p . join ( configs_dir , ' config.d ' ) )
users_d_dir = p . abspath ( p . join ( configs_dir , ' users.d ' ) )
2018-09-28 14:53:20 +00:00
os . mkdir ( conf_d_dir )
2017-05-19 18:54:05 +00:00
os . mkdir ( config_d_dir )
2017-07-11 11:44:16 +00:00
os . mkdir ( users_d_dir )
2017-05-19 18:54:05 +00:00
shutil . copy ( p . join ( HELPERS_DIR , ' common_instance_config.xml ' ) , config_d_dir )
2018-07-25 16:00:51 +00:00
# Generate and write macros file
macros = self . macros . copy ( )
macros [ ' instance ' ] = self . name
2017-05-19 18:54:05 +00:00
with open ( p . join ( config_d_dir , ' macros.xml ' ) , ' w ' ) as macros_config :
2018-07-25 16:00:51 +00:00
macros_config . write ( self . dict_to_xml ( { " macros " : macros } ) )
2017-05-19 18:54:05 +00:00
2017-05-30 11:49:17 +00:00
# Put ZooKeeper config
2017-05-19 18:54:05 +00:00
if self . with_zookeeper :
2018-09-28 14:53:20 +00:00
shutil . copy ( self . zookeeper_config_path , conf_d_dir )
2017-05-19 18:54:05 +00:00
2017-05-30 11:49:17 +00:00
# Copy config dir
if self . custom_config_dir :
distutils . dir_util . copy_tree ( self . custom_config_dir , configs_dir )
# Copy config.d configs
for path in self . custom_main_config_paths :
2017-05-19 18:54:05 +00:00
shutil . copy ( path , config_d_dir )
2017-05-30 11:49:17 +00:00
# Copy users.d configs
for path in self . custom_user_config_paths :
shutil . copy ( path , users_d_dir )
2017-08-25 13:47:09 +00:00
db_dir = p . abspath ( p . join ( self . path , ' database ' ) )
2017-05-19 18:54:05 +00:00
os . mkdir ( db_dir )
2017-06-15 20:08:26 +00:00
if self . clickhouse_path_dir is not None :
distutils . dir_util . copy_tree ( self . clickhouse_path_dir , db_dir )
2017-05-19 18:54:05 +00:00
2017-08-25 13:47:09 +00:00
logs_dir = p . abspath ( p . join ( self . path , ' logs ' ) )
2017-05-19 18:54:05 +00:00
os . mkdir ( logs_dir )
2018-05-14 11:10:07 +00:00
depends_on = [ ]
if self . with_mysql :
depends_on . append ( " mysql1 " )
2018-07-18 05:22:01 +00:00
if self . with_kafka :
depends_on . append ( " kafka1 " )
2017-05-19 18:54:05 +00:00
if self . with_zookeeper :
2018-05-14 11:10:07 +00:00
depends_on . append ( " zoo1 " )
depends_on . append ( " zoo2 " )
depends_on . append ( " zoo3 " )
2017-05-19 18:54:05 +00:00
2018-07-28 14:38:08 +00:00
env_file = _create_env_file ( os . path . dirname ( self . docker_compose_path ) , self . env_variables )
2018-08-22 15:42:27 +00:00
odbc_ini_path = " "
if self . odbc_ini_path :
self . _create_odbc_config_file ( )
odbc_ini_path = ' - ' + self . odbc_ini_path
2018-11-22 15:59:00 +00:00
entrypoint_cmd = CLICKHOUSE_START_COMMAND
if self . stay_alive :
entrypoint_cmd = CLICKHOUSE_STAY_ALIVE_COMMAND
2018-12-27 15:55:51 +00:00
ipv4_address = ipv6_address = " "
if self . ipv4_address is None and self . ipv6_address is None :
networks = " "
app_net = " "
else :
networks = " networks: "
2019-06-04 20:59:31 +00:00
app_net = " default: "
2018-12-27 15:55:51 +00:00
if self . ipv4_address is not None :
ipv4_address = " ipv4_address: " + self . ipv4_address
if self . ipv6_address is not None :
ipv6_address = " ipv6_address: " + self . ipv6_address
2017-05-19 18:54:05 +00:00
with open ( self . docker_compose_path , ' w ' ) as docker_compose :
docker_compose . write ( DOCKER_COMPOSE_TEMPLATE . format (
2018-08-22 15:42:27 +00:00
image = self . image ,
2017-05-19 18:54:05 +00:00
name = self . name ,
2017-08-02 14:42:35 +00:00
hostname = self . hostname ,
2017-05-19 18:54:05 +00:00
binary_path = self . server_bin_path ,
2019-01-29 17:17:31 +00:00
odbc_bridge_bin_path = self . odbc_bridge_bin_path ,
2017-05-19 18:54:05 +00:00
configs_dir = configs_dir ,
config_d_dir = config_d_dir ,
2018-05-14 11:14:49 +00:00
db_dir = db_dir ,
2017-05-19 18:54:05 +00:00
logs_dir = logs_dir ,
2018-07-28 14:38:08 +00:00
depends_on = str ( depends_on ) ,
2019-02-21 17:34:19 +00:00
user = os . getuid ( ) ,
2018-08-22 15:42:27 +00:00
env_file = env_file ,
odbc_ini_path = odbc_ini_path ,
2018-11-22 15:59:00 +00:00
entrypoint_cmd = entrypoint_cmd ,
2018-12-27 15:55:51 +00:00
networks = networks ,
app_net = app_net ,
ipv4_address = ipv4_address ,
ipv6_address = ipv6_address ,
2018-08-22 15:42:27 +00:00
) )
2017-05-19 18:54:05 +00:00
def destroy_dir ( self ) :
if p . exists ( self . path ) :
shutil . rmtree ( self . path )