ClickHouse/tests/sqllogic/test_runner.py
2023-07-19 12:24:31 +00:00

586 lines
20 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import enum
import logging
import os
import traceback
import io
import json
import test_parser
from exceptions import (
Error,
ProgramError,
DataResultDiffer,
StatementExecutionError,
StatementSuccess,
QueryExecutionError,
QuerySuccess,
SchemeResultDiffer,
)
from connection import execute_request
logger = logging.getLogger("parser")
logger.setLevel(logging.DEBUG)
def _list_files(path):
logger.debug("list files in %s, type %s", path, type(path))
if not isinstance(path, str):
raise ProgramError("NotImplemented")
if os.path.isfile(path):
yield path
else:
with os.scandir(path) as it:
for entry in it:
yield from _list_files(entry.path)
def _filter_files(suffix, files):
yield from (path for path in files if path.endswith(suffix))
class RequestType(str, enum.Enum):
statement = enum.auto()
query = enum.auto()
class Status(str, enum.Enum):
success = "success"
error = "error"
class TestStatus:
def __init__(self):
self.status = None
self.file = None
self.position = None
self.request_type = None
self.request = None
self.reason = None
def get_map(self):
return {
"status": self.status.name.lower(),
# "file": self.file,
"position": self.position,
"request_type": self.request_type.name.lower(),
"request": self.request,
"reason": self.reason,
}
@staticmethod
def __from_error(err):
if isinstance(err, Error):
result = TestStatus()
result.name = err.test_name
result.file = err.test_file
result.position = err.test_pos
result.request = err.request
result.reason = err.reason
return result
raise ProgramError("NotImplemented")
@staticmethod
def from_exception(ex):
result = TestStatus.__from_error(ex)
if isinstance(ex, StatementSuccess):
result.status = Status.success
result.request_type = RequestType.statement
elif isinstance(ex, StatementExecutionError):
result.status = Status.error
result.request_type = RequestType.statement
elif isinstance(ex, QuerySuccess):
result.status = Status.success
result.request_type = RequestType.query
elif isinstance(ex, QueryExecutionError):
result.status = Status.error
result.request_type = RequestType.query
elif isinstance(ex, SchemeResultDiffer):
result.status = Status.error
result.request_type = RequestType.query
elif isinstance(ex, DataResultDiffer):
result.status = Status.error
result.request_type = RequestType.query
else:
raise ProgramError("NotImplemented", parent=ex)
return result
class SimpleStats:
def __init__(self, general=None):
self._general = general
self._success = 0
self._fail = 0
@property
def all(self):
return self._success + self.fail
@property
def success(self):
return self._success
@success.setter
def success(self, value):
if self._general is not None:
self._general.success += value - self._success
self._success = value
@property
def fail(self):
return self._fail
@fail.setter
def fail(self, value):
if self._general is not None:
self._general.fail += value - self._fail
self._fail = value
def __repr__(self):
return str(self.get_map())
def update(self, status):
if not isinstance(status, TestStatus):
raise ProgramError("NotImplemented")
if status.status == Status.error:
self.fail += 1
else:
self.success += 1
def get_map(self):
result = dict()
result["success"] = self.success
result["fail"] = self.fail
return result
def combine_with(self, right):
if not isinstance(right, SimpleStats):
raise ProgramError("NotImplemented")
self.success += right.success
self.fail += right.fail
class Stats:
def __init__(self):
self.total = SimpleStats()
self.statements = SimpleStats(self.total)
self.queries = SimpleStats(self.total)
def __repr__(self):
return str(self.get_map())
def update(self, status):
if not isinstance(status, TestStatus):
raise ProgramError("NotImplemented")
if status.request_type == RequestType.query:
choose = self.queries
else:
choose = self.statements
choose.update(status)
def get_map(self):
result = dict()
result["statements"] = self.statements.get_map()
result["queries"] = self.queries.get_map()
result["total"] = self.total.get_map()
return result
def combine_with(self, right):
if not isinstance(right, Stats):
raise ProgramError("NotImplemented")
self.statements.combine_with(right.statements)
self.queries.combine_with(right.queries)
class OneReport:
def __init__(self, test_name, test_file):
self.test_name = test_name
self.test_file = test_file
self.stats = Stats()
self.requests = dict() # type: dict(int, TestStatus)
def update(self, status):
if not isinstance(status, TestStatus):
raise ProgramError("NotImplemented")
self.stats.update(status)
self.requests[status.position] = status
def __repr__(self):
return str(self.get_map())
def get_map(self):
result = dict()
result["test_name"] = self.test_name
result["test_file"] = self.test_file
result["stats"] = self.stats.get_map()
result["requests"] = dict()
requests = result["requests"]
for pos, status in self.requests.items():
requests[pos] = status.get_map()
return result
class Report:
def __init__(self, dbms_name, input_dir=None):
self.dbms_name = dbms_name
self.stats = Stats()
self.tests = dict() # type: dict(str, OneReport)
self.input_dir = input_dir
self.output_dir = None
def update(self, status):
if not isinstance(status, TestStatus):
raise ProgramError("NotImplemented")
self.stats.update(status)
self.__get_file_report(status).update(status)
def __get_file_report(self, status):
if status.name not in self.tests:
self.tests[status.name] = OneReport(status.name, status.file)
return self.tests[status.name]
def __repr__(self):
return str(self.get_map())
def assign_result_dir(self, res_dir):
self.output_dir = res_dir
def get_map(self):
result = dict()
result["dbms_name"] = self.dbms_name
result["stats"] = self.stats.get_map()
result["input_dir"] = self.input_dir
if self.input_dir is not None:
result["input_dir"] = self.input_dir
if self.output_dir is not None:
result["output_dir"] = self.output_dir
result["tests"] = dict()
tests = result["tests"]
for test_name, one_report in self.tests.items():
tests.update({test_name: one_report.get_map()})
return result
def combine_with(self, right):
if not isinstance(right, Report):
raise ProgramError("NotImplemented")
if self.dbms_name != right.dbms_name:
raise ProgramError("reports are attached to the different databases")
if self.input_dir is None or right.input_dir is None:
raise ProgramError("can't compare input dirs")
if self.input_dir != right.input_dir:
raise ProgramError(
"can't combine reports, they are attached to the different input dirs"
)
for test_name in right.tests.keys():
if test_name in self.tests:
raise ProgramError(
f"can't combine reports, they have intersect tests, {test_name}"
)
self.tests.update(right.tests)
self.stats.combine_with(right.stats)
return self
def write_report(self, report_dir):
report_path = os.path.join(report_dir, "report.json")
logger.info(f"create file {report_path}")
with open(report_path, "w") as stream:
stream.write(json.dumps(self.get_map(), indent=4))
class TestRunner:
def __init__(
self,
connection,
verify_mode=None,
skip_request_types=None,
stop_at_statement_error=None,
):
self.connection = connection
self.verify = False if verify_mode is None else verify_mode
self.skip_request_types = []
if skip_request_types is not None:
for req_type in skip_request_types:
self.with_skip(req_type)
self.stop_at_statement_error = (
False if stop_at_statement_error is None else stop_at_statement_error
)
self.dbms_name = connection.DBMS_NAME
self.report = None
self.results = None
self._input_dir = None
def with_verify_mode(self):
self.verify = True
return self
def with_completion_mode(self):
self.verify = False
return self
def with_skip(self, type_request):
if type_request == RequestType.query:
self.skip_request_types.append(test_parser.BlockType.query)
if type_request == RequestType.statement:
self.skip_request_types.append(test_parser.BlockType.statement)
def __statuses(self, parser, out_stream):
skip_rest = False
for block in parser.test_blocks():
test_file = parser.get_test_file()
test_name = parser.get_test_name()
position = block.get_pos()
name_pos = f"{test_name}:{position}"
clogger = logging.getLogger(f"parser at {name_pos}")
if skip_rest:
clogger.debug("Skip rest blocks")
block.dump_to(out_stream)
continue
if block.get_block_type() == test_parser.BlockType.comments:
clogger.debug("Skip comment block")
block.dump_to(out_stream)
continue
if block.get_block_type() == test_parser.BlockType.control:
clogger.debug("Skip control block %s", name_pos)
block.dump_to(out_stream)
continue
clogger.debug("Request <%s>", block.get_request())
cond_lines = block.get_conditions()
if not test_parser.check_conditions(cond_lines, self.dbms_name):
clogger.debug("Conditionally skip block for %s", self.dbms_name)
block.dump_to(out_stream)
continue
request = block.get_request()
if block.get_block_type() in self.skip_request_types:
clogger.debug("Runtime skip block for %s", self.dbms_name)
block.dump_to(out_stream)
continue
exec_res = execute_request(request, self.connection)
if block.get_block_type() == test_parser.BlockType.statement:
try:
clogger.debug("this is statement")
if block.expected_error():
clogger.debug("error is expected")
if not exec_res.has_exception():
raise StatementExecutionError(
"statement request did not fail as expected"
)
else:
clogger.debug("ok is expected")
if exec_res.has_exception():
raise StatementExecutionError(
"statement failed with exception",
parent=exec_res.get_exception(),
)
raise StatementSuccess()
except StatementSuccess as ok:
clogger.debug("statement is ok")
ok.set_details(
file=test_file, name=test_name, pos=position, request=request
)
block.dump_to(out_stream)
yield TestStatus.from_exception(ok)
except StatementExecutionError as err:
err.set_details(
file=test_file, name=test_name, pos=position, request=request
)
clogger.critical("Unable to execute statement, %s", err.reason)
block.dump_to(out_stream)
if self.stop_at_statement_error:
clogger.critical("Will skip the rest of the file")
skip_rest = True
yield TestStatus.from_exception(err)
if block.get_block_type() == test_parser.BlockType.query:
try:
clogger.debug("this is query")
expected_error = block.expected_error()
if expected_error:
clogger.debug("error is expected %s", expected_error)
if exec_res.has_exception():
e = exec_res.get_exception()
clogger.debug("had error %s", e)
message = str(e).lower()
if expected_error not in message:
clogger.debug("errors differed")
raise QueryExecutionError(
"query is expected to fail with different error",
details=f"expected error: {expected_error}",
parent=exec_res.get_exception(),
)
else:
clogger.debug("errors matched")
raise QuerySuccess()
else:
clogger.debug("missed error")
raise QueryExecutionError(
"query is expected to fail with error",
details="expected error: {}".format(expected_error),
)
else:
clogger.debug("success is expected")
if exec_res.has_exception():
clogger.debug("had error")
if self.verify:
clogger.debug("verify mode")
canonic = test_parser.QueryResult.parse_it(
block.get_result(), 10
)
exception = QueryExecutionError(
"query execution failed with an exception",
parent=exec_res.get_exception(),
)
actual = test_parser.QueryResult.as_exception(exception)
test_parser.QueryResult.assert_eq(canonic, actual)
block.with_result(actual)
raise QuerySuccess()
else:
clogger.debug("completion mode")
raise QueryExecutionError(
"query execution failed with an exception",
parent=exec_res.get_exception(),
)
canonic_types = block.get_types()
clogger.debug("canonic types %s", canonic_types)
if len(exec_res.get_result()) > 0:
actual_columns_count = len(exec_res.get_result()[0])
canonic_columns_count = len(canonic_types)
if canonic_columns_count != actual_columns_count:
raise SchemeResultDiffer(
"canonic and actual columns count differ",
details="expected columns {}, actual columns {}".format(
canonic_columns_count, actual_columns_count
),
)
actual = test_parser.QueryResult.make_it(
exec_res.get_result(), canonic_types, block.get_sort_mode(), 10
)
if self.verify:
clogger.debug("verify mode")
canonic = test_parser.QueryResult.parse_it(
block.get_result(), 10
)
test_parser.QueryResult.assert_eq(canonic, actual)
block.with_result(actual)
raise QuerySuccess()
except QuerySuccess as ok:
ok.set_details(
file=test_file, name=test_name, pos=position, request=request
)
clogger.debug("query ok")
block.dump_to(out_stream)
yield TestStatus.from_exception(ok)
except Error as err:
err.set_details(
file=test_file, name=test_name, pos=position, request=request
)
clogger.warning(
"Query has failed with exception: %s",
err.reason,
)
block.with_result(test_parser.QueryResult.as_exception(err))
block.dump_to(out_stream)
yield TestStatus.from_exception(err)
def run_one_test(self, stream, test_name, test_file):
if self._input_dir is not None:
if not test_file.startswith(self._input_dir):
raise ProgramError(
f"that runner instance is attached to tests in dir {self._input_dir}"
f", can't run with file {test_file}"
)
else:
self._input_dir = os.path.dirname(test_file)
if self.report is None:
self.report = Report(self.dbms_name, self._input_dir)
if self.results is None:
self.results = dict()
with self.connection.with_one_test_scope():
out_stream = io.StringIO()
self.results[test_name] = out_stream
parser = test_parser.TestFileParser(stream, test_name, test_file)
for status in self.__statuses(parser, out_stream):
self.report.update(status)
def _assert_input_dir(self, input_dir):
if self._input_dir is not None:
if self._input_dir != input_dir:
raise ProgramError(
f"that runner instance is attached to tests in dir {self._input_dir}"
f", can't run with {input_dir}"
)
def run_all_tests_from_file(self, test_file, input_dir=None):
self._assert_input_dir(input_dir)
self._input_dir = input_dir
if self._input_dir is None:
self._input_dir = os.path.dirname(test_file)
test_name = os.path.relpath(test_file, start=self._input_dir)
logger.debug("open file %s", test_name)
with open(test_file, "r") as stream:
self.run_one_test(stream, test_name, test_file)
def run_all_tests_from_dir(self, input_dir):
self._assert_input_dir(input_dir)
self._input_dir = input_dir
for file_path in TestRunner.list_tests(self._input_dir):
self.run_all_tests_from_file(file_path, self._input_dir)
def write_results_to_dir(self, dir_path):
if not os.path.isdir(dir_path):
raise NotADirectoryError(dir_path)
self.report.assign_result_dir(dir_path)
for test_name, stream in self.results.items():
test_file = os.path.join(dir_path, test_name)
logger.info(f"create file {test_file}")
result_dir = os.path.dirname(test_file)
os.makedirs(result_dir, exist_ok=True)
with open(test_file, "w") as output:
output.write(stream.getvalue())
def write_report(self, report_dir):
self.report.write_report(report_dir)
@staticmethod
def list_tests(input_dir):
yield from _filter_files(".test", _list_files(input_dir))