mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
585 lines
20 KiB
Python
585 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
import enum
|
|
import logging
|
|
import os
|
|
import traceback
|
|
import io
|
|
import json
|
|
|
|
import test_parser
|
|
from exceptions import (
|
|
Error,
|
|
ProgramError,
|
|
DataResultDiffer,
|
|
StatementExecutionError,
|
|
StatementSuccess,
|
|
QueryExecutionError,
|
|
QuerySuccess,
|
|
SchemeResultDiffer,
|
|
)
|
|
from connection import execute_request
|
|
|
|
|
|
logger = logging.getLogger("parser")
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
def _list_files(path):
|
|
logger.debug("list files in %s, type %s", path, type(path))
|
|
|
|
if not isinstance(path, str):
|
|
raise ProgramError("NotImplemented")
|
|
|
|
if os.path.isfile(path):
|
|
yield path
|
|
else:
|
|
with os.scandir(path) as it:
|
|
for entry in it:
|
|
yield from _list_files(entry.path)
|
|
|
|
|
|
def _filter_files(suffix, files):
|
|
yield from (path for path in files if path.endswith(suffix))
|
|
|
|
|
|
class RequestType(str, enum.Enum):
|
|
statement = enum.auto()
|
|
query = enum.auto()
|
|
|
|
|
|
class Status(str, enum.Enum):
|
|
success = "success"
|
|
error = "error"
|
|
|
|
|
|
class TestStatus:
|
|
def __init__(self):
|
|
self.status = None
|
|
self.file = None
|
|
self.position = None
|
|
self.request_type = None
|
|
self.request = None
|
|
self.reason = None
|
|
|
|
def get_map(self):
|
|
return {
|
|
"status": self.status.name.lower(),
|
|
# "file": self.file,
|
|
"position": self.position,
|
|
"request_type": self.request_type.name.lower(),
|
|
"request": self.request,
|
|
"reason": self.reason,
|
|
}
|
|
|
|
@staticmethod
|
|
def __from_error(err):
|
|
if isinstance(err, Error):
|
|
result = TestStatus()
|
|
result.name = err.test_name
|
|
result.file = err.test_file
|
|
result.position = err.test_pos
|
|
result.request = err.request
|
|
result.reason = err.reason
|
|
return result
|
|
raise ProgramError("NotImplemented")
|
|
|
|
@staticmethod
|
|
def from_exception(ex):
|
|
result = TestStatus.__from_error(ex)
|
|
|
|
if isinstance(ex, StatementSuccess):
|
|
result.status = Status.success
|
|
result.request_type = RequestType.statement
|
|
elif isinstance(ex, StatementExecutionError):
|
|
result.status = Status.error
|
|
result.request_type = RequestType.statement
|
|
elif isinstance(ex, QuerySuccess):
|
|
result.status = Status.success
|
|
result.request_type = RequestType.query
|
|
elif isinstance(ex, QueryExecutionError):
|
|
result.status = Status.error
|
|
result.request_type = RequestType.query
|
|
elif isinstance(ex, SchemeResultDiffer):
|
|
result.status = Status.error
|
|
result.request_type = RequestType.query
|
|
elif isinstance(ex, DataResultDiffer):
|
|
result.status = Status.error
|
|
result.request_type = RequestType.query
|
|
else:
|
|
raise ProgramError("NotImplemented", parent=ex)
|
|
|
|
return result
|
|
|
|
|
|
class SimpleStats:
|
|
def __init__(self, general=None):
|
|
self._general = general
|
|
self._success = 0
|
|
self._fail = 0
|
|
|
|
@property
|
|
def all(self):
|
|
return self._success + self.fail
|
|
|
|
@property
|
|
def success(self):
|
|
return self._success
|
|
|
|
@success.setter
|
|
def success(self, value):
|
|
if self._general is not None:
|
|
self._general.success += value - self._success
|
|
self._success = value
|
|
|
|
@property
|
|
def fail(self):
|
|
return self._fail
|
|
|
|
@fail.setter
|
|
def fail(self, value):
|
|
if self._general is not None:
|
|
self._general.fail += value - self._fail
|
|
self._fail = value
|
|
|
|
def __repr__(self):
|
|
return str(self.get_map())
|
|
|
|
def update(self, status):
|
|
if not isinstance(status, TestStatus):
|
|
raise ProgramError("NotImplemented")
|
|
|
|
if status.status == Status.error:
|
|
self.fail += 1
|
|
else:
|
|
self.success += 1
|
|
|
|
def get_map(self):
|
|
result = dict()
|
|
result["success"] = self.success
|
|
result["fail"] = self.fail
|
|
return result
|
|
|
|
def combine_with(self, right):
|
|
if not isinstance(right, SimpleStats):
|
|
raise ProgramError("NotImplemented")
|
|
self.success += right.success
|
|
self.fail += right.fail
|
|
|
|
|
|
class Stats:
|
|
def __init__(self):
|
|
self.total = SimpleStats()
|
|
self.statements = SimpleStats(self.total)
|
|
self.queries = SimpleStats(self.total)
|
|
|
|
def __repr__(self):
|
|
return str(self.get_map())
|
|
|
|
def update(self, status):
|
|
if not isinstance(status, TestStatus):
|
|
raise ProgramError("NotImplemented")
|
|
|
|
if status.request_type == RequestType.query:
|
|
choose = self.queries
|
|
else:
|
|
choose = self.statements
|
|
choose.update(status)
|
|
|
|
def get_map(self):
|
|
result = dict()
|
|
result["statements"] = self.statements.get_map()
|
|
result["queries"] = self.queries.get_map()
|
|
result["total"] = self.total.get_map()
|
|
return result
|
|
|
|
def combine_with(self, right):
|
|
if not isinstance(right, Stats):
|
|
raise ProgramError("NotImplemented")
|
|
self.statements.combine_with(right.statements)
|
|
self.queries.combine_with(right.queries)
|
|
|
|
|
|
class OneReport:
|
|
def __init__(self, test_name, test_file):
|
|
self.test_name = test_name
|
|
self.test_file = test_file
|
|
self.stats = Stats()
|
|
self.requests = dict() # type: dict(int, TestStatus)
|
|
|
|
def update(self, status):
|
|
if not isinstance(status, TestStatus):
|
|
raise ProgramError("NotImplemented")
|
|
|
|
self.stats.update(status)
|
|
self.requests[status.position] = status
|
|
|
|
def __repr__(self):
|
|
return str(self.get_map())
|
|
|
|
def get_map(self):
|
|
result = dict()
|
|
result["test_name"] = self.test_name
|
|
result["test_file"] = self.test_file
|
|
result["stats"] = self.stats.get_map()
|
|
result["requests"] = dict()
|
|
requests = result["requests"]
|
|
for pos, status in self.requests.items():
|
|
requests[pos] = status.get_map()
|
|
return result
|
|
|
|
|
|
class Report:
|
|
def __init__(self, dbms_name, input_dir=None):
|
|
self.dbms_name = dbms_name
|
|
self.stats = Stats()
|
|
self.tests = dict() # type: dict(str, OneReport)
|
|
self.input_dir = input_dir
|
|
self.output_dir = None
|
|
|
|
def update(self, status):
|
|
if not isinstance(status, TestStatus):
|
|
raise ProgramError("NotImplemented")
|
|
|
|
self.stats.update(status)
|
|
self.__get_file_report(status).update(status)
|
|
|
|
def __get_file_report(self, status):
|
|
if status.name not in self.tests:
|
|
self.tests[status.name] = OneReport(status.name, status.file)
|
|
return self.tests[status.name]
|
|
|
|
def __repr__(self):
|
|
return str(self.get_map())
|
|
|
|
def assign_result_dir(self, res_dir):
|
|
self.output_dir = res_dir
|
|
|
|
def get_map(self):
|
|
result = dict()
|
|
result["dbms_name"] = self.dbms_name
|
|
result["stats"] = self.stats.get_map()
|
|
result["input_dir"] = self.input_dir
|
|
if self.input_dir is not None:
|
|
result["input_dir"] = self.input_dir
|
|
if self.output_dir is not None:
|
|
result["output_dir"] = self.output_dir
|
|
result["tests"] = dict()
|
|
tests = result["tests"]
|
|
for test_name, one_report in self.tests.items():
|
|
tests.update({test_name: one_report.get_map()})
|
|
return result
|
|
|
|
def combine_with(self, right):
|
|
if not isinstance(right, Report):
|
|
raise ProgramError("NotImplemented")
|
|
|
|
if self.dbms_name != right.dbms_name:
|
|
raise ProgramError("reports are attached to the different databases")
|
|
|
|
if self.input_dir is None or right.input_dir is None:
|
|
raise ProgramError("can't compare input dirs")
|
|
|
|
if self.input_dir != right.input_dir:
|
|
raise ProgramError(
|
|
"can't combine reports, they are attached to the different input dirs"
|
|
)
|
|
|
|
for test_name in right.tests.keys():
|
|
if test_name in self.tests:
|
|
raise ProgramError(
|
|
f"can't combine reports, they have intersect tests, {test_name}"
|
|
)
|
|
|
|
self.tests.update(right.tests)
|
|
self.stats.combine_with(right.stats)
|
|
return self
|
|
|
|
def write_report(self, report_dir):
|
|
report_path = os.path.join(report_dir, "report.json")
|
|
logger.info(f"create file {report_path}")
|
|
with open(report_path, "w") as stream:
|
|
stream.write(json.dumps(self.get_map(), indent=4))
|
|
|
|
|
|
class TestRunner:
|
|
def __init__(
|
|
self,
|
|
connection,
|
|
verify_mode=None,
|
|
skip_request_types=None,
|
|
stop_at_statement_error=None,
|
|
):
|
|
self.connection = connection
|
|
self.verify = False if verify_mode is None else verify_mode
|
|
self.skip_request_types = []
|
|
if skip_request_types is not None:
|
|
for req_type in skip_request_types:
|
|
self.with_skip(req_type)
|
|
self.stop_at_statement_error = (
|
|
False if stop_at_statement_error is None else stop_at_statement_error
|
|
)
|
|
|
|
self.dbms_name = connection.DBMS_NAME
|
|
self.report = None
|
|
self.results = None
|
|
self._input_dir = None
|
|
|
|
def with_verify_mode(self):
|
|
self.verify = True
|
|
return self
|
|
|
|
def with_completion_mode(self):
|
|
self.verify = False
|
|
return self
|
|
|
|
def with_skip(self, type_request):
|
|
if type_request == RequestType.query:
|
|
self.skip_request_types.append(test_parser.BlockType.query)
|
|
if type_request == RequestType.statement:
|
|
self.skip_request_types.append(test_parser.BlockType.statement)
|
|
|
|
def __statuses(self, parser, out_stream):
|
|
skip_rest = False
|
|
|
|
for block in parser.test_blocks():
|
|
test_file = parser.get_test_file()
|
|
test_name = parser.get_test_name()
|
|
position = block.get_pos()
|
|
name_pos = f"{test_name}:{position}"
|
|
|
|
clogger = logging.getLogger(f"parser at {name_pos}")
|
|
|
|
if skip_rest:
|
|
clogger.debug("Skip rest blocks")
|
|
block.dump_to(out_stream)
|
|
continue
|
|
|
|
if block.get_block_type() == test_parser.BlockType.comments:
|
|
clogger.debug("Skip comment block")
|
|
block.dump_to(out_stream)
|
|
continue
|
|
|
|
if block.get_block_type() == test_parser.BlockType.control:
|
|
clogger.debug("Skip control block", name_pos)
|
|
block.dump_to(out_stream)
|
|
continue
|
|
|
|
clogger.debug("Request <%s>", block.get_request())
|
|
|
|
cond_lines = block.get_conditions()
|
|
if not test_parser.check_conditions(cond_lines, self.dbms_name):
|
|
clogger.debug("Conditionally skip block for %s", self.dbms_name)
|
|
block.dump_to(out_stream)
|
|
continue
|
|
|
|
request = block.get_request()
|
|
exec_res = execute_request(request, self.connection)
|
|
|
|
if block.get_block_type() in self.skip_request_types:
|
|
clogger.debug("Runtime skip block for %s", self.dbms_name)
|
|
block.dump_to(out_stream)
|
|
continue
|
|
|
|
if block.get_block_type() == test_parser.BlockType.statement:
|
|
try:
|
|
clogger.debug("this is statement")
|
|
if block.expected_error():
|
|
clogger.debug("error is expected")
|
|
if not exec_res.has_exception():
|
|
raise StatementExecutionError(
|
|
"statement request did not fail as expected"
|
|
)
|
|
else:
|
|
clogger.debug("ok is expected")
|
|
if exec_res.has_exception():
|
|
raise StatementExecutionError(
|
|
"statement failed with exception",
|
|
parent=exec_res.get_exception(),
|
|
)
|
|
raise StatementSuccess()
|
|
except StatementSuccess as ok:
|
|
clogger.debug("statement is ok")
|
|
ok.set_details(
|
|
file=test_file, name=test_name, pos=position, request=request
|
|
)
|
|
block.dump_to(out_stream)
|
|
yield TestStatus.from_exception(ok)
|
|
except StatementExecutionError as err:
|
|
err.set_details(
|
|
file=test_file, name=test_name, pos=position, request=request
|
|
)
|
|
clogger.critical("Unable to execute statement, %s", err.reason)
|
|
block.dump_to(out_stream)
|
|
if self.stop_at_statement_error:
|
|
clogger.critical("Will skip the rest of the file")
|
|
skip_rest = True
|
|
yield TestStatus.from_exception(err)
|
|
|
|
if block.get_block_type() == test_parser.BlockType.query:
|
|
try:
|
|
clogger.debug("this is query")
|
|
expected_error = block.expected_error()
|
|
if expected_error:
|
|
clogger.debug("error is expected %s", expected_error)
|
|
if exec_res.has_exception():
|
|
e = exec_res.get_exception()
|
|
clogger.debug("had error %s", e)
|
|
message = str(e).lower()
|
|
if expected_error not in message:
|
|
clogger.debug("errors differed")
|
|
raise QueryExecutionError(
|
|
"query is expected to fail with different error",
|
|
details=f"expected error: {expected_error}",
|
|
parent=exec_res.get_exception(),
|
|
)
|
|
else:
|
|
clogger.debug("errors matched")
|
|
raise QuerySuccess()
|
|
else:
|
|
clogger.debug("missed error")
|
|
raise QueryExecutionError(
|
|
"query is expected to fail with error",
|
|
details="expected error: {}".format(expected_error),
|
|
)
|
|
else:
|
|
clogger.debug("success is expected")
|
|
if exec_res.has_exception():
|
|
clogger.debug("had error")
|
|
if self.verify:
|
|
clogger.debug("verify mode")
|
|
canonic = test_parser.QueryResult.parse_it(
|
|
block.get_result(), 10
|
|
)
|
|
exception = QueryExecutionError(
|
|
"query execution failed with an exception",
|
|
parent=exec_res.get_exception(),
|
|
)
|
|
actual = test_parser.QueryResult.as_exception(exception)
|
|
test_parser.QueryResult.assert_eq(canonic, actual)
|
|
block.with_result(actual)
|
|
raise QuerySuccess()
|
|
else:
|
|
clogger.debug("completion mode")
|
|
raise QueryExecutionError(
|
|
"query execution failed with an exception",
|
|
parent=exec_res.get_exception(),
|
|
)
|
|
|
|
canonic_types = block.get_types()
|
|
clogger.debug("canonic types %s", canonic_types)
|
|
|
|
if len(exec_res.get_result()) > 0:
|
|
actual_columns_count = len(exec_res.get_result()[0])
|
|
canonic_columns_count = len(canonic_types)
|
|
if canonic_columns_count != actual_columns_count:
|
|
raise SchemeResultDiffer(
|
|
"canonic and actual columns count differ",
|
|
details="expected columns {}, actual columns {}".format(
|
|
canonic_columns_count, actual_columns_count
|
|
),
|
|
)
|
|
|
|
actual = test_parser.QueryResult.make_it(
|
|
exec_res.get_result(), canonic_types, block.get_sort_mode(), 10
|
|
)
|
|
|
|
if self.verify:
|
|
clogger.debug("verify mode")
|
|
canonic = test_parser.QueryResult.parse_it(
|
|
block.get_result(), 10
|
|
)
|
|
test_parser.QueryResult.assert_eq(canonic, actual)
|
|
|
|
block.with_result(actual)
|
|
raise QuerySuccess()
|
|
|
|
except QuerySuccess as ok:
|
|
ok.set_details(
|
|
file=test_file, name=test_name, pos=position, request=request
|
|
)
|
|
clogger.debug("query ok")
|
|
block.dump_to(out_stream)
|
|
yield TestStatus.from_exception(ok)
|
|
except Error as err:
|
|
err.set_details(
|
|
file=test_file, name=test_name, pos=position, request=request
|
|
)
|
|
clogger.warning(
|
|
"Query has failed with exception: %s",
|
|
err.reason,
|
|
)
|
|
block.with_result(test_parser.QueryResult.as_exception(err))
|
|
block.dump_to(out_stream)
|
|
yield TestStatus.from_exception(err)
|
|
|
|
def run_one_test(self, stream, test_name, test_file):
|
|
if self._input_dir is not None:
|
|
if not test_file.startswith(self._input_dir):
|
|
raise ProgramError(
|
|
f"that runner instance is attached to tests in dir {self._input_dir}"
|
|
f", can't run with file {test_file}"
|
|
)
|
|
else:
|
|
self._input_dir = os.path.dirname(test_file)
|
|
|
|
if self.report is None:
|
|
self.report = Report(self.dbms_name, self._input_dir)
|
|
|
|
if self.results is None:
|
|
self.results = dict()
|
|
|
|
with self.connection.with_one_test_scope():
|
|
out_stream = io.StringIO()
|
|
self.results[test_name] = out_stream
|
|
|
|
parser = test_parser.TestFileParser(stream, test_name, test_file)
|
|
for status in self.__statuses(parser, out_stream):
|
|
self.report.update(status)
|
|
|
|
def _assert_input_dir(self, input_dir):
|
|
if self._input_dir is not None:
|
|
if self._input_dir != input_dir:
|
|
raise ProgramError(
|
|
f"that runner instance is attached to tests in dir {self._input_dir}"
|
|
f", can't run with {input_dir}"
|
|
)
|
|
|
|
def run_all_tests_from_file(self, test_file, input_dir=None):
|
|
self._assert_input_dir(input_dir)
|
|
self._input_dir = input_dir
|
|
if self._input_dir is None:
|
|
self._input_dir = os.path.dirname(test_file)
|
|
|
|
test_name = os.path.relpath(test_file, start=self._input_dir)
|
|
logger.debug("open file %s", test_name)
|
|
with open(test_file, "r") as stream:
|
|
self.run_one_test(stream, test_name, test_file)
|
|
|
|
def run_all_tests_from_dir(self, input_dir):
|
|
self._assert_input_dir(input_dir)
|
|
self._input_dir = input_dir
|
|
for file_path in TestRunner.list_tests(self._input_dir):
|
|
self.run_all_tests_from_file(file_path, self._input_dir)
|
|
|
|
def write_results_to_dir(self, dir_path):
|
|
if not os.path.isdir(dir_path):
|
|
raise NotADirectoryError(dir_path)
|
|
|
|
self.report.assign_result_dir(dir_path)
|
|
|
|
for test_name, stream in self.results.items():
|
|
test_file = os.path.join(dir_path, test_name)
|
|
logger.info(f"create file {test_file}")
|
|
result_dir = os.path.dirname(test_file)
|
|
os.makedirs(result_dir, exist_ok=True)
|
|
with open(test_file, "w") as output:
|
|
output.write(stream.getvalue())
|
|
|
|
def write_report(self, report_dir):
|
|
self.report.write_report(report_dir)
|
|
|
|
@staticmethod
|
|
def list_tests(input_dir):
|
|
yield from _filter_files(".test", _list_files(input_dir))
|