Merge pull request #30065 from azat/clickhouse-test-http-interface

clickhouse-test: replace clickhouse-driver with http interface (via http.client)
This commit is contained in:
alexey-milovidov 2021-10-14 09:14:25 +03:00 committed by GitHub
commit a1d349bfff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 118 additions and 123 deletions

View File

@ -67,7 +67,7 @@ RUN apt-get update \
unixodbc \
--yes --no-install-recommends
RUN pip3 install numpy scipy pandas Jinja2 pandas clickhouse_driver
RUN pip3 install numpy scipy pandas Jinja2
# This symlink required by gcc to find lld compiler
RUN ln -s /usr/bin/lld-${LLVM_VERSION} /usr/bin/ld.lld

View File

@ -27,7 +27,7 @@ RUN apt-get update \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
RUN pip3 install Jinja2 pandas clickhouse_driver
RUN pip3 install Jinja2
COPY * /

View File

@ -34,7 +34,7 @@ RUN apt-get update -y \
postgresql-client \
sqlite3
RUN pip3 install numpy scipy pandas Jinja2 clickhouse_driver
RUN pip3 install numpy scipy pandas Jinja2
RUN mkdir -p /tmp/clickhouse-odbc-tmp \
&& wget -nv -O - ${odbc_driver_url} | tar --strip-components=1 -xz -C /tmp/clickhouse-odbc-tmp \

View File

@ -10,7 +10,7 @@ RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes \
python3-pip \
pylint \
yamllint \
&& pip3 install codespell pandas clickhouse_driver
&& pip3 install codespell
COPY run.sh /
COPY process_style_check_result.py /

View File

@ -13,6 +13,10 @@ import re
import copy
import traceback
import math
# Not requests, to avoid requiring extra dependency.
import http.client
import urllib.parse
import json
from argparse import ArgumentParser
from typing import Tuple, Union, Optional, Dict, Set, List
@ -34,9 +38,6 @@ import multiprocessing
import socket
from contextlib import closing
import clickhouse_driver
import pandas
USE_JINJA = True
try:
import jinja2
@ -51,53 +52,64 @@ MESSAGES_TO_RETRY = [
"DB::Exception: Cannot enqueue query",
"is executing longer than distributed_ddl_task_timeout" # FIXME
]
error_codes = clickhouse_driver.errors.ErrorCodes
error_codes.NOT_A_LEADER = 529
ERROR_CODES_TO_RETRY = [
error_codes.ALL_CONNECTION_TRIES_FAILED,
error_codes.DATABASE_NOT_EMPTY,
error_codes.NOT_A_LEADER,
error_codes.UNFINISHED,
]
MAX_RETRIES = 3
TEST_FILE_EXTENSIONS = ['.sql', '.sql.j2', '.sh', '.py', '.expect']
class Client(clickhouse_driver.Client):
# return first column of the first row
def execute_one(self, *args, **kwargs):
return super().execute(*args, **kwargs)[0][0]
class HTTPError(Exception):
def __init__(self, message=None, code=None):
self.message = message
self.code = code
super().__init__(message)
# return pandas.DataFrame
def execute_pandas(self, *args, **kwargs):
data = super().execute(*args, **kwargs, with_column_types=True)
return Client.__combine(data)
def __str__(self):
return 'Code: {}. {}'.format(self.code, self.message)
@staticmethod
def __combine(data):
cols = data[1]
rows = data[0]
header = [ i[0] for i in cols ]
data = pandas.DataFrame(data=rows, columns=header)
return data
# Helpers to execute queries via HTTP interface.
def clickhouse_execute_http(base_args, query, timeout=30, settings=None, default_format=None):
client = http.client.HTTPConnection(
host=base_args.tcp_host,
port=base_args.http_port,
timeout=timeout)
timeout = int(timeout)
params = {
'query': query,
# Helpers
def make_clickhouse_client(base_args):
return Client(host=base_args.tcp_host, port=base_args.tcp_port,
# hung check in stress tests may remove the database,
# hence we should use 'system'.
database='system',
settings=get_additional_client_options_dict(base_args))
'database': 'system',
def clickhouse_execute_one(base_args, *args, **kwargs):
return make_clickhouse_client(base_args).execute_one(*args, **kwargs)
'connect_timeout': timeout,
'receive_timeout': timeout,
'send_timeout': timeout,
def clickhouse_execute(base_args, *args, **kwargs):
return make_clickhouse_client(base_args).execute(*args, **kwargs)
'http_connection_timeout': timeout,
'http_receive_timeout': timeout,
'http_send_timeout': timeout,
}
if settings is not None:
params.update(settings)
if default_format is not None:
params['default_format'] = default_format
def clickhouse_execute_pandas(base_args, *args, **kwargs):
return make_clickhouse_client(base_args).execute_pandas(*args, **kwargs)
client.request('POST', '/?' + base_args.client_options_query_str + urllib.parse.urlencode(params))
res = client.getresponse()
data = res.read()
if res.status != 200:
raise HTTPError(data.decode(), res.status)
return data
def clickhouse_execute(base_args, query, timeout=30, settings=None):
return clickhouse_execute_http(base_args, query, timeout, settings).strip()
def clickhouse_execute_json(base_args, query, timeout=30, settings=None):
data = clickhouse_execute_http(base_args, query, timeout, settings, 'JSONEachRow')
if not data:
return None
return json.loads(data)
class Terminated(KeyboardInterrupt):
@ -144,12 +156,12 @@ def get_db_engine(args, database_name):
def get_zookeeper_session_uptime(args):
try:
if args.replicated_database:
return int(clickhouse_execute_one(args, """
return int(clickhouse_execute(args, """
SELECT min(materialize(zookeeperSessionUptime()))
FROM clusterAllReplicas('test_cluster_database_replicated', system.one)
"""))
else:
return int(clickhouse_execute_one(args, 'SELECT zookeeperSessionUptime()'))
return int(clickhouse_execute(args, 'SELECT zookeeperSessionUptime()'))
except:
return None
@ -163,30 +175,16 @@ def need_retry(args, stdout, stderr, total_time):
return True
return any(msg in stdout for msg in MESSAGES_TO_RETRY) or any(msg in stderr for msg in MESSAGES_TO_RETRY)
def need_retry_error(args, error, total_time):
# Sometimes we may get unexpected exception like "Replica is readonly" or "Shutdown is called for table"
# instead of "Session expired" or "Connection loss"
# Retry if session was expired during test execution
session_uptime = get_zookeeper_session_uptime(args)
if session_uptime is not None and session_uptime < math.ceil(total_time):
return True
if isinstance(error, clickhouse_driver.errors.Error):
if error.code in ERROR_CODES_TO_RETRY:
return True
if any(msg in error.message for msg in MESSAGES_TO_RETRY):
return True
return False
def get_processlist(args):
if args.replicated_database:
return clickhouse_execute_pandas(args, """
return clickhouse_execute_json(args, """
SELECT materialize((hostName(), tcpPort())) as host, *
FROM clusterAllReplicas('test_cluster_database_replicated', system.processes)
WHERE query NOT LIKE '%system.processes%'
""")
else:
return clickhouse_execute_pandas(args, 'SHOW PROCESSLIST')
return clickhouse_execute_json(args, 'SHOW PROCESSLIST')
# collect server stacktraces using gdb
@ -358,6 +356,7 @@ class TestCase:
testcase_args.testcase_start_time = datetime.now()
testcase_basename = os.path.basename(case_file)
testcase_args.testcase_client = f"{testcase_args.client} --log_comment='{testcase_basename}'"
testcase_args.testcase_basename = testcase_basename
if testcase_args.database:
database = testcase_args.database
@ -372,11 +371,9 @@ class TestCase:
database = 'test_{suffix}'.format(suffix=random_str())
try:
clickhouse_execute(args, "CREATE DATABASE " + database + get_db_engine(testcase_args, database), settings={'log_comment': testcase_basename})
except (TimeoutError, clickhouse_driver.errors.SocketTimeoutError):
total_time = (datetime.now() - testcase_args.testcase_start_time).total_seconds()
return None, "", f"Timeout creating database {database} before test", total_time
clickhouse_execute(args, "CREATE DATABASE " + database + get_db_engine(testcase_args, database), settings={
'log_comment': testcase_args.testcase_basename,
})
os.environ["CLICKHOUSE_DATABASE"] = database
# Set temporary directory to match the randomly generated database,
@ -570,7 +567,7 @@ class TestCase:
# >> append to stderr (but not stdout since it is not used there),
# because there are also output of per test database creation
if not args.database:
pattern = '{test} > {stdout} 2>> {stderr}'
pattern = '{test} > {stdout} 2> {stderr}'
else:
pattern = '{test} > {stdout} 2> {stderr}'
@ -593,11 +590,10 @@ class TestCase:
if need_drop_database:
seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 20)
try:
client = make_clickhouse_client(args)
client.connection.force_connect()
with client.connection.timeout_setter(seconds_left):
client.execute("DROP DATABASE " + database)
except (TimeoutError, clickhouse_driver.errors.SocketTimeoutError):
clickhouse_execute(args, "DROP DATABASE " + database, timeout=seconds_left, settings={
'log_comment': args.testcase_basename,
})
except socket.timeout:
total_time = (datetime.now() - start_time).total_seconds()
return None, "", f"Timeout dropping database {database} after test", total_time
shutil.rmtree(args.test_tmp_dir)
@ -803,7 +799,7 @@ class TestSuite:
@staticmethod
def readTestSuite(args, suite_dir_name: str):
def is_data_present():
return int(clickhouse_execute_one(args, 'EXISTS TABLE test.hits'))
return int(clickhouse_execute(args, 'EXISTS TABLE test.hits'))
base_dir = os.path.abspath(args.queries)
tmp_dir = os.path.abspath(args.tmp)
@ -976,7 +972,7 @@ def check_server_started(args):
print(" OK")
sys.stdout.flush()
return True
except (ConnectionRefusedError, ConnectionResetError, clickhouse_driver.errors.NetworkError):
except (ConnectionRefusedError, ConnectionResetError):
print('.', end='')
sys.stdout.flush()
retry_count -= 1
@ -1003,31 +999,31 @@ class BuildFlags():
def collect_build_flags(args):
result = []
value = clickhouse_execute_one(args, "SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'")
if '-fsanitize=thread' in value:
value = clickhouse_execute(args, "SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'")
if b'-fsanitize=thread' in value:
result.append(BuildFlags.THREAD)
elif '-fsanitize=address' in value:
elif b'-fsanitize=address' in value:
result.append(BuildFlags.ADDRESS)
elif '-fsanitize=undefined' in value:
elif b'-fsanitize=undefined' in value:
result.append(BuildFlags.UNDEFINED)
elif '-fsanitize=memory' in value:
elif b'-fsanitize=memory' in value:
result.append(BuildFlags.MEMORY)
value = clickhouse_execute_one(args, "SELECT value FROM system.build_options WHERE name = 'BUILD_TYPE'")
if 'Debug' in value:
value = clickhouse_execute(args, "SELECT value FROM system.build_options WHERE name = 'BUILD_TYPE'")
if b'Debug' in value:
result.append(BuildFlags.DEBUG)
elif 'RelWithDebInfo' in value or 'Release' in value:
elif b'RelWithDebInfo' in value or b'Release' in value:
result.append(BuildFlags.RELEASE)
value = clickhouse_execute_one(args, "SELECT value FROM system.build_options WHERE name = 'UNBUNDLED'")
if value in ('ON', '1'):
value = clickhouse_execute(args, "SELECT value FROM system.build_options WHERE name = 'UNBUNDLED'")
if value in (b'ON', b'1'):
result.append(BuildFlags.UNBUNDLED)
value = clickhouse_execute_one(args, "SELECT value FROM system.settings WHERE name = 'default_database_engine'")
if value == 'Ordinary':
value = clickhouse_execute(args, "SELECT value FROM system.settings WHERE name = 'default_database_engine'")
if value == b'Ordinary':
result.append(BuildFlags.ORDINARY_DATABASE)
value = int(clickhouse_execute_one(args, "SELECT value FROM system.merge_tree_settings WHERE name = 'min_bytes_for_wide_part'"))
value = int(clickhouse_execute(args, "SELECT value FROM system.merge_tree_settings WHERE name = 'min_bytes_for_wide_part'"))
if value == 0:
result.append(BuildFlags.POLYMORPHIC_PARTS)
@ -1173,9 +1169,9 @@ def main(args):
start_time = datetime.now()
try:
clickhouse_execute(args, "CREATE DATABASE IF NOT EXISTS " + db_name + get_db_engine(args, db_name))
except Exception as e:
except HTTPError as e:
total_time = (datetime.now() - start_time).total_seconds()
if not need_retry_error(args, e, total_time):
if not need_retry(args, e.message, e.message, total_time):
break
create_database_retries += 1
@ -1204,13 +1200,13 @@ def main(args):
# Some queries may execute in background for some time after test was finished. This is normal.
for _ in range(1, 60):
processlist = get_processlist(args)
if processlist.empty:
if not processlist:
break
sleep(1)
if not processlist.empty:
if processlist:
print(colored("\nFound hung queries in processlist:", args, "red", attrs=["bold"]))
print(processlist)
print(json.dumps(processlist, indent=4))
print_stacktraces()
exit_code.value = 1
@ -1262,14 +1258,6 @@ def get_additional_client_options_url(args):
return '&'.join(args.client_option)
return ''
def get_additional_client_options_dict(args):
settings = {}
if args.client_option:
for key, value in map(lambda x: x.split('='), args.client_option):
settings[key] = value
return settings
if __name__ == '__main__':
stop_time = None
exit_code = multiprocessing.Value("i", 0)
@ -1401,6 +1389,13 @@ if __name__ == '__main__':
else:
args.tcp_port = 9000
http_port = os.getenv("CLICKHOUSE_PORT_HTTP")
if http_port is not None:
args.http_port = int(http_port)
args.client += f" --port={http_port}"
else:
args.http_port = 8123
client_database = os.getenv("CLICKHOUSE_DATABASE")
if client_database is not None:
args.client += f' --database={client_database}'
@ -1423,7 +1418,11 @@ if __name__ == '__main__':
else:
os.environ['CLICKHOUSE_URL_PARAMS'] = ''
os.environ['CLICKHOUSE_URL_PARAMS'] += get_additional_client_options_url(args)
client_options_query_str = get_additional_client_options_url(args)
args.client_options_query_str = client_options_query_str + '&'
os.environ['CLICKHOUSE_URL_PARAMS'] += client_options_query_str
else:
args.client_options_query_str = ''
if args.extract_from_config is None:
if os.access(args.binary + '-extract-from-config', os.X_OK):
@ -1434,8 +1433,4 @@ if __name__ == '__main__':
if args.jobs is None:
args.jobs = multiprocessing.cpu_count()
# configure pandas to make it more like Vertical format
pandas.options.display.max_columns = None
pandas.options.display.width = None
main(args)