import os
import random
import socket
import subprocess
import sys
import threading
import time
class ServerThread(threading.Thread):
DEFAULT_RETRIES = 3
DEFAULT_SERVER_DELAY = 0.5 # seconds
DEFAULT_CONNECTION_TIMEOUT = 1.0 # seconds
def __init__(self, bin_prefix, tmp_dir):
self._bin = bin_prefix + '-server'
self._lock = threading.Lock()
threading.Thread.__init__(self)
self._lock.acquire()
self.tmp_dir = tmp_dir
self.log_dir = os.path.join(tmp_dir, 'log')
self.etc_dir = os.path.join(tmp_dir, 'etc')
self.server_config = os.path.join(self.etc_dir, 'server-config.xml')
self.users_config = os.path.join(self.etc_dir, 'users.xml')
self.dicts_config = os.path.join(self.etc_dir, 'dictionaries.xml')
self.client_config = os.path.join(self.etc_dir, 'client-config.xml')
os.makedirs(self.log_dir)
os.makedirs(self.etc_dir)
def _choose_ports_and_args(self):
port_base = random.SystemRandom().randrange(10000, 60000)
self.tcp_port = port_base + 1
self.http_port = port_base + 2
self.inter_port = port_base + 3
self.tcps_port = port_base + 4
self.https_port = port_base + 5
self.odbc_port = port_base + 6
self.proxy_port = port_base + 7
self.postgresql_port = port_base + 8
self._args = [
'--config-file={config_path}'.format(config_path=self.server_config),
'--',
'--tcp_port={tcp_port}'.format(tcp_port=self.tcp_port),
'--http_port={http_port}'.format(http_port=self.http_port),
'--interserver_http_port={inter_port}'.format(inter_port=self.inter_port),
'--tcp_with_proxy_port={proxy_port}'.format(proxy_port=self.proxy_port),
'--postgresql_port={psql_port}'.format(psql_port=self.postgresql_port),
# TODO: SSL certificate is not specified '--tcp_port_secure={tcps_port}'.format(tcps_port=self.tcps_port),
]
with open(self.server_config, 'w') as f:
f.write(ServerThread.DEFAULT_SERVER_CONFIG.format(
tmp_dir=self.tmp_dir, log_dir=self.log_dir, tcp_port=self.tcp_port))
with open(self.users_config, 'w') as f:
f.write(ServerThread.DEFAULT_USERS_CONFIG)
with open(self.dicts_config, 'w') as f:
f.write(ServerThread.DEFAULT_DICTIONARIES_CONFIG.format(tcp_port=self.tcp_port))
with open(self.client_config, 'w') as f:
f.write(ServerThread.DEFAULT_CLIENT_CONFIG)
def run(self):
retries = ServerThread.DEFAULT_RETRIES
while retries:
self._choose_ports_and_args()
print('Start clickhouse-server with args:', self._args)
self._proc = subprocess.Popen([self._bin] + self._args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while self._proc.poll() is None:
try:
time.sleep(ServerThread.DEFAULT_SERVER_DELAY)
s = socket.create_connection(('localhost', self.tcp_port), ServerThread.DEFAULT_CONNECTION_TIMEOUT)
s.sendall(b'G') # trigger expected "bad" HELLO response
s.recv(1024) # FIXME: read whole buffered response
s.shutdown(socket.SHUT_RDWR)
s.close()
except Exception:
# Failed to connect to server - try again
continue
else:
break
# If process has died then try to fetch output before releasing lock
if self._proc.returncode is not None:
stdout, stderr = self._proc.communicate()
print(stdout.decode('utf-8'), file=sys.stderr)
print(stderr.decode('utf-8'), file=sys.stderr)
if self._proc.returncode == 70: # Address already in use
retries -= 1
continue
break
self._lock.release()
if not retries:
print('Failed to start server', file=sys.stderr)
return
while self._proc.returncode is None:
self._proc.communicate()
def wait(self):
self._lock.acquire()
if self._proc.returncode is not None:
self.join()
self._lock.release()
return self._proc.returncode
def stop(self):
if self._proc.returncode is None:
self._proc.terminate()
self.join()
print('Stopped clickhouse-server')
ServerThread.DEFAULT_SERVER_CONFIG = \
"""\
trace
{log_dir}/server/stdout.txt
{log_dir}/server/stderr.txt
never
1
::
{tmp_dir}/data/
{tmp_dir}/tmp/
{tmp_dir}/data/access/
users.xml
dictionaries.xml
5368709120
3
Europe/Moscow
Hello, world!
s1
r1
Version
sum
sum
0
600
172800
6000
max
0
600
172800
6000
TOPSECRET.TOPSECRET
[hidden]
localhost
{tcp_port}
localhost
{tcp_port}
localhost
{tcp_port}
127.0.0.1
{tcp_port}
127.0.0.2
{tcp_port}
localhost
{tcp_port}
localhost
1
shard_0
localhost
{tcp_port}
shard_1
localhost
{tcp_port}
true
127.0.0.1
{tcp_port}
true
127.0.0.2
{tcp_port}
true
127.0.0.1
{tcp_port}
foo
127.0.0.2
{tcp_port}
foo
shard_0
localhost
{tcp_port}
shard_1
localhost
{tcp_port}
memory
testkeeper
/clickhouse/task_queue/ddl
system
system
system
system
system
"""
ServerThread.DEFAULT_USERS_CONFIG = \
"""\
10000000000
0
random
1
::/0
default
default
1
::1
127.0.0.1
readonly
default
3600
0
0
0
0
0
"""
ServerThread.DEFAULT_DICTIONARIES_CONFIG = \
"""\
flat_ints
0
key
i8
Int8
0
i16
Int16
0
i32
Int32
0
i64
Int64
0
u8
UInt8
0
u16
UInt16
0
u32
UInt32
0
u64
UInt64
0
hashed_ints
0
key
i8
Int8
0
i16
Int16
0
i32
Int32
0
i64
Int64
0
u8
UInt8
0
u16
UInt16
0
u32
UInt32
0
u64
UInt64
0
hashed_sparse_ints
0
key
i8
Int8
0
i16
Int16
0
i32
Int32
0
i64
Int64
0
u8
UInt8
0
u16
UInt16
0
u32
UInt32
0
u64
UInt64
0
cache_ints
0
1000
key
i8
Int8
0
i16
Int16
0
i32
Int32
0
i64
Int64
0
u8
UInt8
0
u16
UInt16
0
u32
UInt32
0
u64
UInt64
0
complex_hashed_ints
0
key
UInt64
i8
Int8
0
i16
Int16
0
i32
Int32
0
i64
Int64
0
u8
UInt8
0
u16
UInt16
0
u32
UInt32
0
u64
UInt64
0
complex_cache_ints
0
1000
key
UInt64
i8
Int8
0
i16
Int16
0
i32
Int32
0
i64
Int64
0
u8
UInt8
0
u16
UInt16
0
u32
UInt32
0
u64
UInt64
0
flat_strings
0
key
str
String
hashed_strings
0
key
str
String
cache_strings
0
1000
key
str
String
complex_hashed_strings
0
key
UInt64
str
String
complex_cache_strings
0
1000
key
UInt64
str
String
flat_decimals
0
key
d32
Decimal32(4)
0
d64
Decimal64(6)
0
d128
Decimal128(1)
0
hashed_decimals
0
key
d32
Decimal32(4)
0
d64
Decimal64(6)
0
d128
Decimal128(1)
0
cache_decimals
0
1000
key
d32
Decimal32(4)
0
d64
Decimal64(6)
0
d128
Decimal128(1)
0
complex_hashed_decimals
0
key
UInt64
d32
Decimal32(4)
0
d64
Decimal64(6)
0
d128
Decimal128(1)
0
complex_cache_decimals
0
1000
key
UInt64
d32
Decimal32(4)
0
d64
Decimal64(6)
0
d128
Decimal128(1)
0
simple_executable_cache_dictionary_no_implicit_key
id
UInt64
value
String
10000
300
simple_executable_cache_dictionary_implicit_key
id
UInt64
value
String
10000
300
complex_executable_cache_dictionary_no_implicit_key
id
UInt64
id_key
String
value
String
10000
300
complex_executable_cache_dictionary_implicit_key
id
UInt64
id_key
String
value
String
10000
300
"""
ServerThread.DEFAULT_CLIENT_CONFIG = \
"""\
true
true
sslv2,sslv3
true
AcceptCertificateHandler
"""