import os import random import socket import subprocess import sys import threading import time class ServerThread(threading.Thread): DEFAULT_RETRIES = 3 DEFAULT_SERVER_DELAY = 0.5 # seconds DEFAULT_CONNECTION_TIMEOUT = 1.0 # seconds def __init__(self, bin_prefix, tmp_dir): self._bin = bin_prefix + '-server' self._lock = threading.Lock() threading.Thread.__init__(self) self._lock.acquire() self.tmp_dir = tmp_dir self.log_dir = os.path.join(tmp_dir, 'log') self.etc_dir = os.path.join(tmp_dir, 'etc') self.server_config = os.path.join(self.etc_dir, 'server-config.xml') self.users_config = os.path.join(self.etc_dir, 'users.xml') self.dicts_config = os.path.join(self.etc_dir, 'dictionaries.xml') self.client_config = os.path.join(self.etc_dir, 'client-config.xml') os.makedirs(self.log_dir) os.makedirs(self.etc_dir) def _choose_ports_and_args(self): port_base = random.SystemRandom().randrange(10000, 60000) self.tcp_port = port_base + 1 self.http_port = port_base + 2 self.inter_port = port_base + 3 self.tcps_port = port_base + 4 self.https_port = port_base + 5 self.odbc_port = port_base + 6 self.proxy_port = port_base + 7 self.postgresql_port = port_base + 8 self._args = [ '--config-file={config_path}'.format(config_path=self.server_config), '--', '--tcp_port={tcp_port}'.format(tcp_port=self.tcp_port), '--http_port={http_port}'.format(http_port=self.http_port), '--interserver_http_port={inter_port}'.format(inter_port=self.inter_port), '--tcp_with_proxy_port={proxy_port}'.format(proxy_port=self.proxy_port), '--postgresql_port={psql_port}'.format(psql_port=self.postgresql_port), # TODO: SSL certificate is not specified '--tcp_port_secure={tcps_port}'.format(tcps_port=self.tcps_port), ] with open(self.server_config, 'w') as f: f.write(ServerThread.DEFAULT_SERVER_CONFIG.format( tmp_dir=self.tmp_dir, log_dir=self.log_dir, tcp_port=self.tcp_port)) with open(self.users_config, 'w') as f: f.write(ServerThread.DEFAULT_USERS_CONFIG) with open(self.dicts_config, 'w') as f: f.write(ServerThread.DEFAULT_DICTIONARIES_CONFIG.format(tcp_port=self.tcp_port)) with open(self.client_config, 'w') as f: f.write(ServerThread.DEFAULT_CLIENT_CONFIG) def run(self): retries = ServerThread.DEFAULT_RETRIES while retries: self._choose_ports_and_args() print('Start clickhouse-server with args:', self._args) self._proc = subprocess.Popen([self._bin] + self._args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) while self._proc.poll() is None: try: time.sleep(ServerThread.DEFAULT_SERVER_DELAY) s = socket.create_connection(('localhost', self.tcp_port), ServerThread.DEFAULT_CONNECTION_TIMEOUT) s.sendall(b'G') # trigger expected "bad" HELLO response s.recv(1024) # FIXME: read whole buffered response s.shutdown(socket.SHUT_RDWR) s.close() except Exception: # Failed to connect to server - try again continue else: break # If process has died then try to fetch output before releasing lock if self._proc.returncode is not None: stdout, stderr = self._proc.communicate() print(stdout.decode('utf-8'), file=sys.stderr) print(stderr.decode('utf-8'), file=sys.stderr) if self._proc.returncode == 70: # Address already in use retries -= 1 continue break self._lock.release() if not retries: print('Failed to start server', file=sys.stderr) return while self._proc.returncode is None: self._proc.communicate() def wait(self): self._lock.acquire() if self._proc.returncode is not None: self.join() self._lock.release() return self._proc.returncode def stop(self): if self._proc.returncode is None: self._proc.terminate() self.join() print('Stopped clickhouse-server') ServerThread.DEFAULT_SERVER_CONFIG = \ """\ trace {log_dir}/server/stdout.txt {log_dir}/server/stderr.txt never 1 :: {tmp_dir}/data/ {tmp_dir}/tmp/ {tmp_dir}/data/access/ users.xml dictionaries.xml 5368709120 3 Europe/Moscow Hello, world! s1 r1 Version sum sum 0 600 172800 6000 max 0 600 172800 6000 TOPSECRET.TOPSECRET [hidden] localhost {tcp_port} localhost {tcp_port} localhost {tcp_port} 127.0.0.1 {tcp_port} 127.0.0.2 {tcp_port} localhost {tcp_port} localhost 1 shard_0 localhost {tcp_port} shard_1 localhost {tcp_port} true 127.0.0.1 {tcp_port} true 127.0.0.2 {tcp_port} true 127.0.0.1 {tcp_port} foo 127.0.0.2 {tcp_port} foo 127.0.0.1 {tcp_port} 127.0.0.2 {tcp_port} shard_0 localhost {tcp_port} shard_1 localhost {tcp_port} memory testkeeper /clickhouse/task_queue/ddl system part_log
system query_log
system query_thread_log
system text_log
system trace_log
""" ServerThread.DEFAULT_USERS_CONFIG = \ """\ 10000000000 0 random 1 ::/0 default default 1 ::1 127.0.0.1 readonly default 3600 0 0 0 0 0 """ ServerThread.DEFAULT_DICTIONARIES_CONFIG = \ """\ flat_ints localhost {tcp_port} default system ints
0 key i8 Int8 0 i16 Int16 0 i32 Int32 0 i64 Int64 0 u8 UInt8 0 u16 UInt16 0 u32 UInt32 0 u64 UInt64 0
hashed_ints localhost {tcp_port} default system ints
0 key i8 Int8 0 i16 Int16 0 i32 Int32 0 i64 Int64 0 u8 UInt8 0 u16 UInt16 0 u32 UInt32 0 u64 UInt64 0
hashed_sparse_ints localhost {tcp_port} default system ints
0 key i8 Int8 0 i16 Int16 0 i32 Int32 0 i64 Int64 0 u8 UInt8 0 u16 UInt16 0 u32 UInt32 0 u64 UInt64 0
cache_ints localhost {tcp_port} default system ints
0 1000 key i8 Int8 0 i16 Int16 0 i32 Int32 0 i64 Int64 0 u8 UInt8 0 u16 UInt16 0 u32 UInt32 0 u64 UInt64 0
complex_hashed_ints localhost {tcp_port} default system ints
0 key UInt64 i8 Int8 0 i16 Int16 0 i32 Int32 0 i64 Int64 0 u8 UInt8 0 u16 UInt16 0 u32 UInt32 0 u64 UInt64 0
complex_cache_ints localhost {tcp_port} default system ints
0 1000 key UInt64 i8 Int8 0 i16 Int16 0 i32 Int32 0 i64 Int64 0 u8 UInt8 0 u16 UInt16 0 u32 UInt32 0 u64 UInt64 0
flat_strings localhost {tcp_port} default system strings
0 key str String
hashed_strings localhost {tcp_port} default system strings
0 key str String
cache_strings localhost {tcp_port} default system strings
0 1000 key str String
complex_hashed_strings localhost {tcp_port} default system strings
0 key UInt64 str String
complex_cache_strings localhost {tcp_port} default system strings
0 1000 key UInt64 str String
flat_decimals localhost {tcp_port} default system decimals
0 key d32 Decimal32(4) 0 d64 Decimal64(6) 0 d128 Decimal128(1) 0
hashed_decimals localhost {tcp_port} default system decimals
0 key d32 Decimal32(4) 0 d64 Decimal64(6) 0 d128 Decimal128(1) 0
cache_decimals localhost {tcp_port} default system decimals
0 1000 key d32 Decimal32(4) 0 d64 Decimal64(6) 0 d128 Decimal128(1) 0
complex_hashed_decimals localhost {tcp_port} default system decimals
0 key UInt64 d32 Decimal32(4) 0 d64 Decimal64(6) 0 d128 Decimal128(1) 0
complex_cache_decimals localhost {tcp_port} default system decimals
0 1000 key UInt64 d32 Decimal32(4) 0 d64 Decimal64(6) 0 d128 Decimal128(1) 0
simple_executable_cache_dictionary_no_implicit_key id UInt64 value String echo "1\tValue" TabSeparated false 10000 300 simple_executable_cache_dictionary_implicit_key id UInt64 value String echo "Value" TabSeparated true 10000 300 complex_executable_cache_dictionary_no_implicit_key id UInt64 id_key String value String echo "1\tFirstKey\tValue" TabSeparated false 10000 300 complex_executable_cache_dictionary_implicit_key id UInt64 id_key String value String echo "Value" TabSeparated true 10000 300
""" ServerThread.DEFAULT_CLIENT_CONFIG = \ """\ true true sslv2,sslv3 true AcceptCertificateHandler """