diff --git a/tests/clickhouse-test b/tests/clickhouse-test index 676cf637385..af001bb9772 100755 --- a/tests/clickhouse-test +++ b/tests/clickhouse-test @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +# pylint: disable=too-many-return-statements +import enum import shutil import sys import os @@ -85,137 +87,6 @@ def get_db_engine(args, database_name): return "" # Will use default engine -def configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file): - testcase_args = copy.deepcopy(args) - - testcase_args.testcase_start_time = datetime.now() - testcase_basename = os.path.basename(case_file) - testcase_args.testcase_client = f"{testcase_args.client} --log_comment='{testcase_basename}'" - - if testcase_args.database: - database = testcase_args.database - os.environ.setdefault("CLICKHOUSE_DATABASE", database) - os.environ.setdefault("CLICKHOUSE_TMP", suite_tmp_dir) - else: - # If --database is not specified, we will create temporary database with unique name - # And we will recreate and drop it for each test - def random_str(length=6): - alphabet = string.ascii_lowercase + string.digits - return ''.join(random.choice(alphabet) for _ in range(length)) - database = 'test_{suffix}'.format(suffix=random_str()) - - with open(stderr_file, 'w') as stderr: - client_cmd = testcase_args.testcase_client + " " \ - + get_additional_client_options(args) - - clickhouse_proc_create = open_client_process( - universal_newlines=True, - client_args=client_cmd, - stderr_file=stderr) - - try: - clickhouse_proc_create.communicate(("CREATE DATABASE " + database + get_db_engine(testcase_args, database)), timeout=testcase_args.timeout) - except TimeoutExpired: - total_time = (datetime.now() - testcase_args.testcase_start_time).total_seconds() - return clickhouse_proc_create, "", "Timeout creating database {} before test".format(database), total_time - - os.environ["CLICKHOUSE_DATABASE"] = database - # Set temporary directory to match the randomly generated database, - # because .sh tests also use it for temporary files and we want to avoid - # collisions. - testcase_args.test_tmp_dir = os.path.join(suite_tmp_dir, database) - os.mkdir(testcase_args.test_tmp_dir) - os.environ.setdefault("CLICKHOUSE_TMP", testcase_args.test_tmp_dir) - - testcase_args.testcase_database = database - - return testcase_args - -def run_single_test(args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file): - client = args.testcase_client - start_time = args.testcase_start_time - database = args.testcase_database - - # This is for .sh tests - os.environ["CLICKHOUSE_LOG_COMMENT"] = case_file - - params = { - 'client': client + ' --database=' + database, - 'logs_level': server_logs_level, - 'options': client_options, - 'test': case_file, - 'stdout': stdout_file, - 'stderr': stderr_file, - } - - # >> append to stderr (but not stdout since it is not used there), - # because there are also output of per test database creation - if not args.database: - pattern = '{test} > {stdout} 2>> {stderr}' - else: - pattern = '{test} > {stdout} 2> {stderr}' - - if ext == '.sql': - pattern = "{client} --send_logs_level={logs_level} --testmode --multiquery {options} < " + pattern - - command = pattern.format(**params) - - proc = Popen(command, shell=True, env=os.environ) - - while (datetime.now() - start_time).total_seconds() < args.timeout and proc.poll() is None: - sleep(0.01) - - need_drop_database = not args.database - if need_drop_database and args.no_drop_if_fail: - maybe_passed = (proc.returncode == 0) and (proc.stderr is None) and (proc.stdout is None or 'Exception' not in proc.stdout) - need_drop_database = not maybe_passed - - if need_drop_database: - with open(stderr_file, 'a') as stderr: - clickhouse_proc_create = open_client_process(client, universal_newlines=True, stderr_file=stderr) - - seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 20) - - try: - drop_database_query = "DROP DATABASE " + database - if args.replicated_database: - drop_database_query += " ON CLUSTER test_cluster_database_replicated" - clickhouse_proc_create.communicate((drop_database_query), timeout=seconds_left) - except TimeoutExpired: - # kill test process because it can also hung - if proc.returncode is None: - try: - proc.kill() - except OSError as e: - if e.errno != ESRCH: - raise - - total_time = (datetime.now() - start_time).total_seconds() - return clickhouse_proc_create, "", f"Timeout dropping database {database} after test", total_time - - shutil.rmtree(args.test_tmp_dir) - - total_time = (datetime.now() - start_time).total_seconds() - - # Normalize randomized database names in stdout, stderr files. - os.system("LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=stdout_file)) - if args.hide_db_name: - os.system("LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=stderr_file)) - if args.replicated_database: - os.system("LC_ALL=C sed -i -e 's|/auto_{{shard}}||g' {file}".format(file=stdout_file)) - os.system("LC_ALL=C sed -i -e 's|auto_{{replica}}||g' {file}".format(file=stdout_file)) - - # Normalize hostname in stdout file. - os.system("LC_ALL=C sed -i -e 's/{hostname}/localhost/g' {file}".format(hostname=socket.gethostname(), file=stdout_file)) - - stdout = open(stdout_file, 'rb').read() if os.path.exists(stdout_file) else b'' - stdout = str(stdout, errors='replace', encoding='utf-8') - stderr = open(stderr_file, 'rb').read() if os.path.exists(stderr_file) else b'' - stderr = str(stderr, errors='replace', encoding='utf-8') - - return proc, stdout, stderr, total_time - - def get_zookeeper_session_uptime(args): try: query = b"SELECT zookeeperSessionUptime()" @@ -229,8 +100,7 @@ def get_zookeeper_session_uptime(args): (stdout, _) = clickhouse_proc.communicate((query), timeout=20) return int(stdout.decode('utf-8').strip()) - except Exception as ex: - print("Exception", ex) + except: return None @@ -241,7 +111,6 @@ def need_retry(args, stdout, stderr, total_time): session_uptime = get_zookeeper_session_uptime(args) if session_uptime is not None and session_uptime < math.ceil(total_time): return True - return any(msg in stdout for msg in MESSAGES_TO_RETRY) or any(msg in stderr for msg in MESSAGES_TO_RETRY) @@ -360,110 +229,609 @@ def colored(text, args, color=None, on_color=None, attrs=None): return text +class TestStatus(enum.Enum): + FAIL = "FAIL" + UNKNOWN = "UNKNOWN" + OK = "OK" + SKIPPED = "SKIPPED" + + +class FailureReason(enum.Enum): + # FAIL reasons + TIMEOUT = "Timeout!" + SERVER_DIED = "server died" + EXIT_CODE = "return code: " + STDERR = "having stderror: " + EXCEPTION = "having having exception in stdout: " + RESULT_DIFF = "result differs with reference: " + TOO_LONG = "Test runs too long (> 60s). Make it faster." + + # SKIPPED reasons + DISABLED = "disabled" + SKIP = "skip" + NO_JINJA = "no jinja" + NO_ZOOKEEPER = "no zookeeper" + NO_SHARD = "no shard" + FAST_ONLY = "running fast tests only" + NO_LONG = "not running long tests" + REPLICATED_DB = "replicated-database" + BUILD = "not running for current build" + + # UNKNOWN reasons + NO_REFERENCE = "no reference file" + INTERNAL_ERROR = "Test internal error: " + + +class TestResult: + def __init__(self, case_name: str, status: TestStatus, reason: Optional[FailureReason], total_time: float, description: str): + self.case_name: str = case_name + self.status: TestStatus = status + self.reason: Optional[FailureReason] = reason + self.total_time: float = total_time + self.description: str = description + self.need_retry: bool = False + + def check_if_need_retry(self, args, stdout, stderr, runs_count): + if self.status != TestStatus.FAIL: + return + if not need_retry(args, stdout, stderr, self.total_time): + return + if MAX_RETRIES < runs_count: + return + self.need_retry = True + + +class TestCase: + @staticmethod + def get_reference_file(suite_dir, name): + """ + Returns reference file name for specified test + """ + + name = removesuffix(name, ".gen") + for ext in ['.reference', '.gen.reference']: + reference_file = os.path.join(suite_dir, name) + ext + if os.path.isfile(reference_file): + return reference_file + return None + + @staticmethod + def configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file): + testcase_args = copy.deepcopy(args) + + testcase_args.testcase_start_time = datetime.now() + testcase_basename = os.path.basename(case_file) + testcase_args.testcase_client = f"{testcase_args.client} --log_comment='{testcase_basename}'" + + if testcase_args.database: + database = testcase_args.database + os.environ.setdefault("CLICKHOUSE_DATABASE", database) + os.environ.setdefault("CLICKHOUSE_TMP", suite_tmp_dir) + else: + # If --database is not specified, we will create temporary database with unique name + # And we will recreate and drop it for each test + def random_str(length=6): + alphabet = string.ascii_lowercase + string.digits + return ''.join(random.choice(alphabet) for _ in range(length)) + + database = 'test_{suffix}'.format(suffix=random_str()) + + with open(stderr_file, 'w') as stderr: + client_cmd = testcase_args.testcase_client + " " \ + + get_additional_client_options(args) + + clickhouse_proc_create = open_client_process( + universal_newlines=True, + client_args=client_cmd, + stderr_file=stderr) + + try: + clickhouse_proc_create.communicate( + ("CREATE DATABASE " + database + get_db_engine(testcase_args, database)), + timeout=testcase_args.timeout) + except TimeoutExpired: + total_time = (datetime.now() - testcase_args.testcase_start_time).total_seconds() + return clickhouse_proc_create, "", "Timeout creating database {} before test".format( + database), total_time + + os.environ["CLICKHOUSE_DATABASE"] = database + # Set temporary directory to match the randomly generated database, + # because .sh tests also use it for temporary files and we want to avoid + # collisions. + testcase_args.test_tmp_dir = os.path.join(suite_tmp_dir, database) + os.mkdir(testcase_args.test_tmp_dir) + os.environ.setdefault("CLICKHOUSE_TMP", testcase_args.test_tmp_dir) + + testcase_args.testcase_database = database + + return testcase_args + + def __init__(self, suite, case: str, args, is_concurrent: bool): + self.case: str = case # case file name + self.tags: Set[str] = suite.all_tags[case] if case in suite.all_tags else set() + + self.case_file: str = os.path.join(suite.suite_path, case) + (self.name, self.ext) = os.path.splitext(case) + + file_suffix = ('.' + str(os.getpid())) if is_concurrent and args.test_runs > 1 else '' + self.reference_file = self.get_reference_file(suite.suite_path, self.name) + self.stdout_file = os.path.join(suite.suite_tmp_path, self.name) + file_suffix + '.stdout' + self.stderr_file = os.path.join(suite.suite_tmp_path, self.name) + file_suffix + '.stderr' + + self.testcase_args = None + self.runs_count = 0 + + # should skip test, should increment skipped_total, skip reason + def should_skip_test(self, suite) -> Optional[FailureReason]: + tags = self.tags + + if tags and ('disabled' in tags) and not args.disabled: + return FailureReason.DISABLED + + elif os.path.exists(os.path.join(suite.suite_path, self.name) + '.disabled') and not args.disabled: + return FailureReason.DISABLED + + elif args.skip and any(s in self.name for s in args.skip): + return FailureReason.SKIP + + elif not USE_JINJA and self.ext.endswith("j2"): + return FailureReason.NO_JINJA + + elif tags and (('zookeeper' in tags) or ('replica' in tags)) and not args.zookeeper: + return FailureReason.NO_ZOOKEEPER + + elif tags and (('shard' in tags) or ('distributed' in tags) or ('global' in tags)) and not args.shard: + return FailureReason.NO_SHARD + + elif tags and ('no-fasttest' in tags) and args.fast_tests_only: + return FailureReason.FAST_ONLY + + elif tags and (('long' in tags) or ('deadlock' in tags) or ('race' in tags)) and args.no_long: + # Tests for races and deadlocks usually are run in a loop for a significant amount of time + return FailureReason.NO_LONG + + elif tags and ('no-replicated-database' in tags) and args.replicated_database: + return FailureReason.REPLICATED_DB + + elif tags: + for build_flag in args.build_flags: + if 'no-' + build_flag in tags: + return FailureReason.BUILD + + return None + + def process_result_impl(self, proc, stdout: str, stderr: str, total_time: float): + description = "" + + if proc.returncode is None: + try: + proc.kill() + except OSError as e: + if e.errno != ESRCH: + raise + + if stderr: + description += stderr + return TestResult(self.name, TestStatus.FAIL, FailureReason.TIMEOUT, total_time, description) + + if proc.returncode != 0: + reason = FailureReason.EXIT_CODE + description += str(proc.returncode) + + if stderr: + description += "\n" + description += stderr + + # Stop on fatal errors like segmentation fault. They are sent to client via logs. + if ' ' in stderr: + reason = FailureReason.SERVER_DIED + + if self.testcase_args.stop \ + and ('Connection refused' in stderr or 'Attempt to read after eof' in stderr) \ + and 'Received exception from server' not in stderr: + reason = FailureReason.SERVER_DIED + + if os.path.isfile(self.stdout_file): + description += ", result:\n\n" + description += '\n'.join(open(self.stdout_file).read().split('\n')[:100]) + description += '\n' + + description += "\nstdout:\n{}\n".format(stdout) + return TestResult(self.name, TestStatus.FAIL, reason, total_time, description) + + if stderr: + description += "\n{}\n".format('\n'.join(stderr.split('\n')[:100])) + description += "\nstdout:\n{}\n".format(stdout) + return TestResult(self.name, TestStatus.FAIL, FailureReason.STDERR, total_time, description) + + if 'Exception' in stdout: + description += "\n{}\n".format('\n'.join(stdout.split('\n')[:100])) + return TestResult(self.name, TestStatus.FAIL, FailureReason.EXCEPTION, total_time, description) + + if '@@SKIP@@' in stdout: + skip_reason = stdout.replace('@@SKIP@@', '').rstrip("\n") + description += " - " + description += skip_reason + return TestResult(self.name, TestStatus.SKIPPED, FailureReason.SKIP, total_time, description) + + if self.reference_file is None: + return TestResult(self.name, TestStatus.UNKNOWN, FailureReason.NO_REFERENCE, total_time, description) + + result_is_different = subprocess.call(['diff', '-q', self.reference_file, self.stdout_file], stdout=PIPE) + + if result_is_different: + diff = Popen(['diff', '-U', str(self.testcase_args.unified), self.reference_file, self.stdout_file], stdout=PIPE, + universal_newlines=True).communicate()[0] + description += "\n{}\n".format(diff) + return TestResult(self.name, TestStatus.FAIL, FailureReason.RESULT_DIFF, total_time, description) + + if self.testcase_args.test_runs > 1 and total_time > 60 and 'long' not in self.name: + # We're in Flaky Check mode, check the run time as well while we're at it. + return TestResult(self.name, TestStatus.FAIL, FailureReason.TOO_LONG, total_time, description) + + if os.path.exists(self.stdout_file): + os.remove(self.stdout_file) + if os.path.exists(self.stderr_file): + os.remove(self.stderr_file) + + return TestResult(self.name, TestStatus.OK, None, total_time, description) + + @staticmethod + def print_test_time(test_time) -> str: + if args.print_time: + return " {0:.2f} sec.".format(test_time) + else: + return '' + + def process_result(self, result: TestResult, messages): + description_full = messages[result.status] + description_full += self.print_test_time(result.total_time) + if result.reason is not None: + description_full += " - " + description_full += result.reason.value + + description_full += result.description + description_full += "\n" + + if result.status == TestStatus.FAIL: + description_full += 'Database: ' + self.testcase_args.testcase_database + + result.description = description_full + return result + + @staticmethod + def send_test_name_failed(suite: str, case: str) -> bool: + clickhouse_proc = open_client_process(args.client, universal_newlines=True) + + failed_to_check = False + + pid = os.getpid() + query = f"SELECT 'Running test {suite}/{case} from pid={pid}';" + + try: + clickhouse_proc.communicate((query), timeout=20) + except: + failed_to_check = True + + return failed_to_check or clickhouse_proc.returncode != 0 + + def run_single_test(self, server_logs_level, client_options): + args = self.testcase_args + client = args.testcase_client + start_time = args.testcase_start_time + database = args.testcase_database + + # This is for .sh tests + os.environ["CLICKHOUSE_LOG_COMMENT"] = self.case_file + + params = { + 'client': client + ' --database=' + database, + 'logs_level': server_logs_level, + 'options': client_options, + 'test': self.case_file, + 'stdout': self.stdout_file, + 'stderr': self.stderr_file, + } + + # >> append to stderr (but not stdout since it is not used there), + # because there are also output of per test database creation + if not args.database: + pattern = '{test} > {stdout} 2>> {stderr}' + else: + pattern = '{test} > {stdout} 2> {stderr}' + + if self.ext == '.sql': + pattern = "{client} --send_logs_level={logs_level} --testmode --multiquery {options} < " + pattern + + command = pattern.format(**params) + + proc = Popen(command, shell=True, env=os.environ) + + while (datetime.now() - start_time).total_seconds() < args.timeout and proc.poll() is None: + sleep(0.01) + + need_drop_database = not args.database + if need_drop_database and args.no_drop_if_fail: + maybe_passed = (proc.returncode == 0) and (proc.stderr is None) and ( + proc.stdout is None or 'Exception' not in proc.stdout) + need_drop_database = not maybe_passed + + if need_drop_database: + with open(self.stderr_file, 'a') as stderr: + clickhouse_proc_create = open_client_process(client, universal_newlines=True, stderr_file=stderr) + + seconds_left = max(args.timeout - (datetime.now() - start_time).total_seconds(), 20) + + try: + drop_database_query = "DROP DATABASE " + database + if args.replicated_database: + drop_database_query += " ON CLUSTER test_cluster_database_replicated" + clickhouse_proc_create.communicate((drop_database_query), timeout=seconds_left) + except TimeoutExpired: + # kill test process because it can also hung + if proc.returncode is None: + try: + proc.kill() + except OSError as e: + if e.errno != ESRCH: + raise + + total_time = (datetime.now() - start_time).total_seconds() + return clickhouse_proc_create, "", f"Timeout dropping database {database} after test", total_time + + shutil.rmtree(args.test_tmp_dir) + + total_time = (datetime.now() - start_time).total_seconds() + + # Normalize randomized database names in stdout, stderr files. + os.system("LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=self.stdout_file)) + if args.hide_db_name: + os.system( + "LC_ALL=C sed -i -e 's/{test_db}/default/g' {file}".format(test_db=database, file=self.stderr_file)) + if args.replicated_database: + os.system("LC_ALL=C sed -i -e 's|/auto_{{shard}}||g' {file}".format(file=self.stdout_file)) + os.system("LC_ALL=C sed -i -e 's|auto_{{replica}}||g' {file}".format(file=self.stdout_file)) + + # Normalize hostname in stdout file. + os.system("LC_ALL=C sed -i -e 's/{hostname}/localhost/g' {file}".format(hostname=socket.gethostname(), + file=self.stdout_file)) + + stdout = open(self.stdout_file, 'rb').read() if os.path.exists(self.stdout_file) else b'' + stdout = str(stdout, errors='replace', encoding='utf-8') + stderr = open(self.stderr_file, 'rb').read() if os.path.exists(self.stderr_file) else b'' + stderr = str(stderr, errors='replace', encoding='utf-8') + + return proc, stdout, stderr, total_time + + def run(self, args, suite, client_options, server_logs_level): + try: + skip_reason = self.should_skip_test(suite) + if skip_reason is not None: + return TestResult(self.name, TestStatus.SKIPPED, skip_reason, 0., "") + + if args.testname and self.send_test_name_failed(suite, self.case): + description = "\nServer does not respond to health check\n" + return TestResult(self.name, TestStatus.FAIL, FailureReason.SERVER_DIED, 0., description) + + self.runs_count += 1 + self.testcase_args = self.configure_testcase_args(args, self.case_file, suite.suite_tmp_path, self.stderr_file) + proc, stdout, stderr, total_time = self.run_single_test(server_logs_level, client_options) + + result = self.process_result_impl(proc, stdout, stderr, total_time) + result.check_if_need_retry(args, stdout, stderr, self.runs_count) + return result + except KeyboardInterrupt as e: + raise e + except: + exc_type, exc_value, tb = sys.exc_info() + exc_name = exc_type.__name__ + traceback_str = "\n".join(traceback.format_tb(tb, 10)) + description = f"{exc_name}\n{exc_value}\n{traceback_str}" + return TestResult(self.name, TestStatus.UNKNOWN, FailureReason.INTERNAL_ERROR, 0., description) + + +class TestSuite: + @staticmethod + def tests_in_suite_key_func(item: str) -> int: + if args.order == 'random': + return random.random() + + reverse = 1 if args.order == 'asc' else -1 + + if -1 == item.find('_'): + return 99998 + + prefix, _ = item.split('_', 1) + + try: + return reverse * int(prefix) + except ValueError: + return 99997 + + @staticmethod + def render_test_template(j2env, suite_dir, test_name): + """ + Render template for test and reference file if needed + """ + + if j2env is None: + return test_name + + test_base_name = removesuffix(test_name, ".sql.j2", ".sql") + + reference_file_name = test_base_name + ".reference.j2" + reference_file_path = os.path.join(suite_dir, reference_file_name) + if os.path.isfile(reference_file_path): + tpl = j2env.get_template(reference_file_name) + tpl.stream().dump(os.path.join(suite_dir, test_base_name) + ".gen.reference") + + if test_name.endswith(".sql.j2"): + tpl = j2env.get_template(test_name) + generated_test_name = test_base_name + ".gen.sql" + tpl.stream().dump(os.path.join(suite_dir, generated_test_name)) + return generated_test_name + + return test_name + + @staticmethod + def read_test_tags(suite_dir: str, all_tests: List[str]) -> Dict[str, Set[str]]: + def get_comment_sign(filename): + if filename.endswith('.sql') or filename.endswith('.sql.j2'): + return '--' + elif filename.endswith('.sh') or filename.endswith('.py') or filename.endswith('.expect'): + return '#' + else: + raise Exception(f'Unknown file_extension: {filename}') + + def parse_tags_from_line(line, comment_sign): + if not line.startswith(comment_sign): + return None + tags_str = line[len(comment_sign):].lstrip() + tags_prefix = "Tags:" + if not tags_str.startswith(tags_prefix): + return None + tags_str = tags_str[len(tags_prefix):] + tags = tags_str.split(',') + tags = {tag.strip() for tag in tags} + return tags + + def is_shebang(line): + return line.startswith('#!') + + def load_tags_from_file(filepath): + with open(filepath, 'r') as file: + try: + line = file.readline() + if is_shebang(line): + line = file.readline() + except UnicodeDecodeError: + return [] + return parse_tags_from_line(line, get_comment_sign(filepath)) + + all_tags = {} + start_time = datetime.now() + for test_name in all_tests: + tags = load_tags_from_file(os.path.join(suite_dir, test_name)) + if tags: + all_tags[test_name] = tags + elapsed = (datetime.now() - start_time).total_seconds() + if elapsed > 1: + print(f"Tags for suite {suite_dir} read in {elapsed:.2f} seconds") + return all_tags + + def __init__(self, args, suite_path: str, suite_tmp_path: str, suite: str): + self.args = args + self.suite_path: str = suite_path + self.suite_tmp_path: str = suite_tmp_path + self.suite: str = suite + + self.all_tests: List[str] = self.get_tests_list(self.tests_in_suite_key_func) + self.all_tags: Dict[str, Set[str]] = self.read_test_tags(self.suite_path, self.all_tests) + + self.sequential_tests = [] + self.parallel_tests = [] + for test_name in self.all_tests: + if self.is_sequential_test(test_name): + self.sequential_tests.append(test_name) + else: + self.parallel_tests.append(test_name) + + def is_sequential_test(self, test_name): + if args.sequential: + if any(s in test_name for s in args.sequential): + return True + + if test_name not in self.all_tags: + return False + + return ('no-parallel' in self.all_tags[test_name]) or ('sequential' in self.all_tags[test_name]) + + def get_tests_list(self, sort_key): + """ + Return list of tests file names to run + """ + + all_tests = list(self.get_selected_tests()) + all_tests = all_tests * self.args.test_runs + all_tests.sort(key=sort_key) + return all_tests + + def get_selected_tests(self): + """ + Find all files with tests, filter, render templates + """ + + j2env = jinja2.Environment( + loader=jinja2.FileSystemLoader(self.suite_path), + keep_trailing_newline=True, + ) if USE_JINJA else None + + for test_name in os.listdir(self.suite_path): + if not is_test_from_dir(self.suite_path, test_name): + continue + if self.args.test and not any(re.search(pattern, test_name) for pattern in self.args.test): + continue + if USE_JINJA and test_name.endswith(".gen.sql"): + continue + test_name = self.render_test_template(j2env, self.suite_path, test_name) + yield test_name + + @staticmethod + def readTestSuite(args, suite_dir_name: str): + def is_data_present(): + clickhouse_proc = open_client_process(args.client) + (stdout, stderr) = clickhouse_proc.communicate(b"EXISTS TABLE test.hits") + if clickhouse_proc.returncode != 0: + raise CalledProcessError(clickhouse_proc.returncode, args.client, stderr) + + return stdout.startswith(b'1') + + base_dir = os.path.abspath(args.queries) + tmp_dir = os.path.abspath(args.tmp) + suite_path = os.path.join(base_dir, suite_dir_name) + + suite_re_obj = re.search('^[0-9]+_(.*)$', suite_dir_name) + if not suite_re_obj: # skip .gitignore and so on + return None + + suite_tmp_path = os.path.join(tmp_dir, suite_dir_name) + if not os.path.exists(suite_tmp_path): + os.makedirs(suite_tmp_path) + + suite = suite_re_obj.group(1) + + if not os.path.isdir(suite_path): + return None + + if 'stateful' in suite and not args.no_stateful and not is_data_present(): + print("Won't run stateful tests because test data wasn't loaded.") + return None + if 'stateless' in suite and args.no_stateless: + print("Won't run stateless tests because they were manually disabled.") + return None + if 'stateful' in suite and args.no_stateful: + print("Won't run stateful tests because they were manually disabled.") + return None + + return TestSuite(args, suite_path, suite_tmp_path, suite) + + stop_time = None exit_code = multiprocessing.Value("i", 0) server_died = multiprocessing.Event() stop_tests_triggered_lock = multiprocessing.Lock() stop_tests_triggered = multiprocessing.Event() queue = multiprocessing.Queue(maxsize=1) +multiprocessing_manager = multiprocessing.Manager() +restarted_tests = multiprocessing_manager.list() - -def print_test_time(test_time) -> str: - if args.print_time: - return " {0:.2f} sec.".format(test_time) - else: - return '' - - -# should skip test, should increment skipped_total, skip reason -def should_skip_test(name: str, test_ext: str, suite_dir: str, all_tags: Dict[str, Set[str]]) -> Tuple[bool, bool, str]: - tags = all_tags.get(name + test_ext) - - should_skip = False - increment_skip_count = False - skip_reason = '' - - if tags and ('disabled' in tags) and not args.disabled: - should_skip = True - increment_skip_count = False - skip_reason = 'disabled' - - elif os.path.exists(os.path.join(suite_dir, name) + '.disabled') and not args.disabled: - should_skip = True - increment_skip_count = False - skip_reason = 'disabled' - - elif args.skip and any(s in name for s in args.skip): - should_skip = True - increment_skip_count = True - skip_reason = 'skip' - - elif not USE_JINJA and test_ext.endswith("j2"): - should_skip = True - increment_skip_count = True - skip_reason = 'no jinja' - - elif tags and (('zookeeper' in tags) or ('replica' in tags)) and not args.zookeeper: - should_skip = True - increment_skip_count = True - skip_reason = 'no zookeeper' - - elif tags and (('shard' in tags) or ('distributed' in tags) or ('global' in tags)) and not args.shard: - should_skip = True - increment_skip_count = True - skip_reason = 'no shard' - - elif tags and ('no-fasttest' in tags) and args.fast_tests_only: - should_skip = True - increment_skip_count = True - skip_reason = 'running fast tests only' - - elif tags and (('long' in tags) or ('deadlock' in tags) or ('race' in tags)) and args.no_long: - # Tests for races and deadlocks usually are run in a loop for a significant amount of time - should_skip = True - increment_skip_count = True - skip_reason = 'not running long tests' - - elif tags and ('no-replicated-database' in tags) and args.replicated_database: - should_skip = True - increment_skip_count = True - skip_reason = 'replicated-database' - - elif tags: - for build_flag in args.build_flags: - if 'no-' + build_flag in tags: - should_skip = True - increment_skip_count = True - skip_reason = build_flag - break - - return should_skip, increment_skip_count, skip_reason - - -def send_test_name_failed(suite: str, case: str) -> bool: - clickhouse_proc = open_client_process(args.client, universal_newlines=True) - - failed_to_check = False - - pid = os.getpid() - query = f"SELECT 'Running test {suite}/{case} from pid={pid}';" - - try: - clickhouse_proc.communicate((query), timeout=20) - except: - failed_to_check = True - - return failed_to_check or clickhouse_proc.returncode != 0 - - -restarted_tests = [] # (test, stderr) - -# def run_tests_array(all_tests, num_tests, suite, suite_dir, suite_tmp_dir, all_tags): +# def run_tests_array(all_tests: List[str], num_tests: int, test_suite: TestSuite): def run_tests_array(all_tests_with_params): - all_tests, num_tests, suite, suite_dir, suite_tmp_dir, all_tags = all_tests_with_params + all_tests, num_tests, test_suite = all_tests_with_params global stop_time global exit_code global server_died + global restarted_tests OP_SQUARE_BRACKET = colored("[", args, attrs=['bold']) CL_SQUARE_BRACKET = colored("]", args, attrs=['bold']) @@ -473,10 +841,11 @@ def run_tests_array(all_tests_with_params): MSG_OK = OP_SQUARE_BRACKET + colored(" OK ", args, "green", attrs=['bold']) + CL_SQUARE_BRACKET MSG_SKIPPED = OP_SQUARE_BRACKET + colored(" SKIPPED ", args, "cyan", attrs=['bold']) + CL_SQUARE_BRACKET + MESSAGES = {TestStatus.FAIL: MSG_FAIL, TestStatus.UNKNOWN: MSG_UNKNOWN, TestStatus.OK: MSG_OK, TestStatus.SKIPPED: MSG_SKIPPED} + passed_total = 0 skipped_total = 0 failures_total = 0 - failures = 0 failures_chain = 0 start_time = datetime.now() @@ -487,7 +856,7 @@ def run_tests_array(all_tests_with_params): if num_tests > 0: about = 'about ' if is_concurrent else '' proc_name = multiprocessing.current_process().name - print(f"\nRunning {about}{num_tests} {suite} tests ({proc_name}).\n") + print(f"\nRunning {about}{num_tests} {test_suite.suite} tests ({proc_name}).\n") while True: if is_concurrent: @@ -509,182 +878,56 @@ def run_tests_array(all_tests_with_params): stop_tests() break - case_file = os.path.join(suite_dir, case) - (name, ext) = os.path.splitext(case) + test_case = TestCase(test_suite, case, args, is_concurrent) try: - status = '' + description = '' if not is_concurrent: sys.stdout.flush() - sys.stdout.write("{0:72}".format(removesuffix(name, ".gen", ".sql") + ": ")) + sys.stdout.write("{0:72}".format(removesuffix(test_case.name, ".gen", ".sql") + ": ")) # This flush is needed so you can see the test name of the long # running test before it will finish. But don't do it in parallel # mode, so that the lines don't mix. sys.stdout.flush() else: - status = "{0:72}".format(removesuffix(name, ".gen", ".sql") + ": ") + description = "{0:72}".format(removesuffix(test_case.name, ".gen", ".sql") + ": ") - skip_test, increment_skip_count, skip_reason = \ - should_skip_test(name, ext, suite_dir, all_tags) + while True: + test_result = test_case.run(args, test_suite, client_options, server_logs_level) + test_result = test_case.process_result(test_result, MESSAGES) + if not test_result.need_retry: + break + restarted_tests.append(test_result) - if skip_test: - status += MSG_SKIPPED + f" - {skip_reason}\n" - - if increment_skip_count: - skipped_total += 1 - else: - if args.testname and send_test_name_failed(suite, case): - failures += 1 - print("Server does not respond to health check") + if test_result.status == TestStatus.OK: + passed_total += 1 + failures_chain = 0 + elif test_result.status == TestStatus.FAIL: + failures_total += 1 + failures_chain += 1 + if test_result.reason == FailureReason.SERVER_DIED: server_died.set() stop_tests() - break - file_suffix = ('.' + str(os.getpid())) if is_concurrent and args.test_runs > 1 else '' - reference_file = get_reference_file(suite_dir, name) - stdout_file = os.path.join(suite_tmp_dir, name) + file_suffix + '.stdout' - stderr_file = os.path.join(suite_tmp_dir, name) + file_suffix + '.stderr' + elif test_result.status == TestStatus.SKIPPED: + skipped_total += 1 - testcase_args = configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file) - proc, stdout, stderr, total_time = run_single_test(testcase_args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file) + description += test_result.description - if proc.returncode is None: - try: - proc.kill() - except OSError as e: - if e.errno != ESRCH: - raise + if description and not description.endswith('\n'): + description += '\n' - failures += 1 - status += MSG_FAIL - status += print_test_time(total_time) - status += " - Timeout!\n" - if stderr: - status += stderr - status += 'Database: ' + testcase_args.testcase_database - else: - counter = 1 - while need_retry(args, stdout, stderr, total_time): - restarted_tests.append((case_file, stderr)) - testcase_args = configure_testcase_args(args, case_file, suite_tmp_dir, stderr_file) - proc, stdout, stderr, total_time = run_single_test(testcase_args, ext, server_logs_level, client_options, case_file, stdout_file, stderr_file) - sleep(2**counter) - counter += 1 - if MAX_RETRIES < counter: - if args.replicated_database: - if DISTRIBUTED_DDL_TIMEOUT_MSG in stderr: - server_died.set() - break - - if proc.returncode != 0: - failures += 1 - failures_chain += 1 - status += MSG_FAIL - status += print_test_time(total_time) - status += ' - return code {}\n'.format(proc.returncode) - - if stderr: - status += stderr - - # Stop on fatal errors like segmentation fault. They are sent to client via logs. - if ' ' in stderr: - server_died.set() - - if testcase_args.stop \ - and ('Connection refused' in stderr or 'Attempt to read after eof' in stderr) \ - and 'Received exception from server' not in stderr: - server_died.set() - - if os.path.isfile(stdout_file): - status += ", result:\n\n" - status += '\n'.join( - open(stdout_file).read().split('\n')[:100]) - status += '\n' - - status += "\nstdout:\n{}\n".format(stdout) - status += 'Database: ' + testcase_args.testcase_database - - elif stderr: - failures += 1 - failures_chain += 1 - status += MSG_FAIL - status += print_test_time(total_time) - status += " - having stderror:\n{}\n".format( - '\n'.join(stderr.split('\n')[:100])) - status += "\nstdout:\n{}\n".format(stdout) - status += 'Database: ' + testcase_args.testcase_database - elif 'Exception' in stdout: - failures += 1 - failures_chain += 1 - status += MSG_FAIL - status += print_test_time(total_time) - status += " - having exception in stdout:\n{}\n".format( - '\n'.join(stdout.split('\n')[:100])) - status += 'Database: ' + testcase_args.testcase_database - elif '@@SKIP@@' in stdout: - skipped_total += 1 - skip_reason = stdout.replace('@@SKIP@@', '').rstrip("\n") - status += MSG_SKIPPED + f" - {skip_reason}\n" - elif reference_file is None: - status += MSG_UNKNOWN - status += print_test_time(total_time) - status += " - no reference file\n" - status += 'Database: ' + testcase_args.testcase_database - else: - result_is_different = subprocess.call(['diff', '-q', reference_file, stdout_file], stdout=PIPE) - - if result_is_different: - diff = Popen(['diff', '-U', str(testcase_args.unified), reference_file, stdout_file], stdout=PIPE, universal_newlines=True).communicate()[0] - failures += 1 - status += MSG_FAIL - status += print_test_time(total_time) - status += " - result differs with reference:\n{}\n".format(diff) - status += 'Database: ' + testcase_args.testcase_database - else: - if testcase_args.test_runs > 1 and total_time > 60 and 'long' not in name: - # We're in Flaky Check mode, check the run time as well while we're at it. - failures += 1 - failures_chain += 1 - status += MSG_FAIL - status += print_test_time(total_time) - status += " - Test runs too long (> 60s). Make it faster.\n" - status += 'Database: ' + testcase_args.testcase_database - else: - passed_total += 1 - failures_chain = 0 - status += MSG_OK - status += print_test_time(total_time) - status += "\n" - if os.path.exists(stdout_file): - os.remove(stdout_file) - if os.path.exists(stderr_file): - os.remove(stderr_file) - - if status and not status.endswith('\n'): - status += '\n' - - sys.stdout.write(status) + sys.stdout.write(description) sys.stdout.flush() except KeyboardInterrupt as e: print(colored("Break tests execution", args, "red")) stop_tests() raise e - except: - exc_type, exc_value, tb = sys.exc_info() - failures += 1 - - exc_name = exc_type.__name__ - traceback_str = "\n".join(traceback.format_tb(tb, 10)) - - print(f"{MSG_FAIL} - Test internal error: {exc_name}") - print(f"{exc_value}\n{traceback_str}") if failures_chain >= 20: stop_tests() break - failures_total = failures_total + failures - if failures_total > 0: print(colored(f"\nHaving {failures_total} errors! {passed_total} tests passed." f" {skipped_total} tests skipped. {(datetime.now() - start_time).total_seconds():.2f} s elapsed" @@ -725,18 +968,12 @@ def check_server_started(client, retry_count): sleep(0.5) continue - # FIXME Some old comment, maybe now CH supports Python3 ? - # We can't print this, because for some reason this is python 2, - # and args appeared in 3.3. To hell with it. - # print(''.join(clickhouse_proc.args)) - - # Other kind of error, fail. - code: int = clickhouse_proc.returncode print(f"\nClient invocation failed with code {code}:\n\ stdout: {stdout}\n\ - stderr: {stderr}") + stderr: {stderr}\n\ + args: {''.join(clickhouse_proc.args)}\n") sys.stdout.flush() @@ -833,23 +1070,6 @@ def suite_key_func(item: str) -> Union[int, Tuple[int, str]]: return 99997, '' -def tests_in_suite_key_func(item: str) -> int: - if args.order == 'random': - return random.random() - - reverse = 1 if args.order == 'asc' else -1 - - if -1 == item.find('_'): - return 99998 - - prefix, _ = item.split('_', 1) - - try: - return reverse * int(prefix) - except ValueError: - return 99997 - - def extract_key(key: str) -> str: return subprocess.getstatusoutput( args.extract_from_config + @@ -867,14 +1087,13 @@ def open_client_process( universal_newlines=True if universal_newlines else None) - -def do_run_tests(jobs, suite, suite_dir, suite_tmp_dir, all_tests, all_tags, parallel_tests, sequential_tests, parallel): - if jobs > 1 and len(parallel_tests) > 0: - print("Found", len(parallel_tests), "parallel tests and", len(sequential_tests), "sequential tests") +def do_run_tests(jobs, test_suite: TestSuite, parallel): + if jobs > 1 and len(test_suite.parallel_tests) > 0: + print("Found", len(test_suite.parallel_tests), "parallel tests and", len(test_suite.sequential_tests), "sequential tests") run_n, run_total = parallel.split('/') run_n = float(run_n) run_total = float(run_total) - tests_n = len(parallel_tests) + tests_n = len(test_suite.parallel_tests) if run_total > tests_n: run_total = tests_n @@ -883,15 +1102,15 @@ def do_run_tests(jobs, suite, suite_dir, suite_tmp_dir, all_tests, all_tags, par if jobs > run_total: run_total = jobs - batch_size = max(1, len(parallel_tests) // jobs) + batch_size = max(1, len(test_suite.parallel_tests) // jobs) parallel_tests_array = [] for _ in range(jobs): - parallel_tests_array.append((None, batch_size, suite, suite_dir, suite_tmp_dir, all_tags)) + parallel_tests_array.append((None, batch_size, test_suite)) with closing(multiprocessing.Pool(processes=jobs)) as pool: pool.map_async(run_tests_array, parallel_tests_array) - for suit in parallel_tests: + for suit in test_suite.parallel_tests: queue.put(suit) for _ in range(jobs): @@ -901,11 +1120,11 @@ def do_run_tests(jobs, suite, suite_dir, suite_tmp_dir, all_tests, all_tags, par pool.join() - run_tests_array((sequential_tests, len(sequential_tests), suite, suite_dir, suite_tmp_dir, all_tags)) - return len(sequential_tests) + len(parallel_tests) + run_tests_array((test_suite.sequential_tests, len(test_suite.sequential_tests), test_suite)) + return len(test_suite.sequential_tests) + len(test_suite.parallel_tests) else: - num_tests = len(all_tests) - run_tests_array((all_tests, num_tests, suite, suite_dir, suite_tmp_dir, all_tags)) + num_tests = len(test_suite.all_tests) + run_tests_array((test_suite.all_tests, num_tests, test_suite)) return num_tests @@ -930,89 +1149,12 @@ def removesuffix(text, *suffixes): return text -def render_test_template(j2env, suite_dir, test_name): - """ - Render template for test and reference file if needed - """ - - if j2env is None: - return test_name - - test_base_name = removesuffix(test_name, ".sql.j2", ".sql") - - reference_file_name = test_base_name + ".reference.j2" - reference_file_path = os.path.join(suite_dir, reference_file_name) - if os.path.isfile(reference_file_path): - tpl = j2env.get_template(reference_file_name) - tpl.stream().dump(os.path.join(suite_dir, test_base_name) + ".gen.reference") - - if test_name.endswith(".sql.j2"): - tpl = j2env.get_template(test_name) - generated_test_name = test_base_name + ".gen.sql" - tpl.stream().dump(os.path.join(suite_dir, generated_test_name)) - return generated_test_name - - return test_name - - -def get_selected_tests(suite_dir, patterns): - """ - Find all files with tests, filter, render templates - """ - - j2env = jinja2.Environment( - loader=jinja2.FileSystemLoader(suite_dir), - keep_trailing_newline=True, - ) if USE_JINJA else None - - for test_name in os.listdir(suite_dir): - if not is_test_from_dir(suite_dir, test_name): - continue - if patterns and not any(re.search(pattern, test_name) for pattern in patterns): - continue - if USE_JINJA and test_name.endswith(".gen.sql"): - continue - test_name = render_test_template(j2env, suite_dir, test_name) - yield test_name - - -def get_tests_list(suite_dir, patterns, test_runs, sort_key): - """ - Return list of tests file names to run - """ - - all_tests = list(get_selected_tests(suite_dir, patterns)) - all_tests = all_tests * test_runs - all_tests.sort(key=sort_key) - return all_tests - - -def get_reference_file(suite_dir, name): - """ - Returns reference file name for specified test - """ - - name = removesuffix(name, ".gen") - for ext in ['.reference', '.gen.reference']: - reference_file = os.path.join(suite_dir, name) + ext - if os.path.isfile(reference_file): - return reference_file - return None - - def main(args): global server_died global stop_time global exit_code global server_logs_level - - def is_data_present(): - clickhouse_proc = open_client_process(args.client) - (stdout, stderr) = clickhouse_proc.communicate(b"EXISTS TABLE test.hits") - if clickhouse_proc.returncode != 0: - raise CalledProcessError(clickhouse_proc.returncode, args.client, stderr) - - return stdout.startswith(b'1') + global restarted_tests if not check_server_started(args.client, args.server_check_retries): msg = "Server is not responding. Cannot execute 'SELECT 1' query. \ @@ -1085,46 +1227,11 @@ def main(args): if server_died.is_set(): break - suite_dir = os.path.join(base_dir, suite) - suite_re_obj = re.search('^[0-9]+_(.*)$', suite) - if not suite_re_obj: # skip .gitignore and so on + test_suite = TestSuite.readTestSuite(args, suite) + if test_suite is None: continue - suite_tmp_dir = os.path.join(tmp_dir, suite) - if not os.path.exists(suite_tmp_dir): - os.makedirs(suite_tmp_dir) - - suite = suite_re_obj.group(1) - - if os.path.isdir(suite_dir): - if 'stateful' in suite and not args.no_stateful and not is_data_present(): - print("Won't run stateful tests because test data wasn't loaded.") - continue - if 'stateless' in suite and args.no_stateless: - print("Won't run stateless tests because they were manually disabled.") - continue - if 'stateful' in suite and args.no_stateful: - print("Won't run stateful tests because they were manually disabled.") - continue - - all_tests = get_tests_list( - suite_dir, args.test, args.test_runs, tests_in_suite_key_func) - - all_tags = read_test_tags(suite_dir, all_tests) - - sequential_tests = [] - if args.sequential: - for test in all_tests: - if any(s in test for s in args.sequential): - sequential_tests.append(test) - else: - sequential_tests = collect_sequential_list(all_tags) - - sequential_tests_set = set(sequential_tests) - parallel_tests = [test for test in all_tests if test not in sequential_tests_set] - - total_tests_run += do_run_tests( - args.jobs, suite, suite_dir, suite_tmp_dir, all_tests, all_tags, parallel_tests, sequential_tests, args.parallel) + total_tests_run += do_run_tests(args.jobs, test_suite, args.parallel) if server_died.is_set(): exit_code.value = 1 @@ -1154,8 +1261,12 @@ def main(args): if len(restarted_tests) > 0: print("\nSome tests were restarted:\n") - for (test_case, stderr) in restarted_tests: - print(test_case + "\n" + stderr + "\n") + for test_result in restarted_tests: + print("\n{0:72}: ".format(test_result.case_name)) + # replace it with lowercase to avoid parsing retried tests as failed + for status in TestStatus: + test_result.status = test_result.status.value.replace(status.value, status.value.lower()) + print(test_result.description) if total_tests_run == 0: print("No tests were run.") @@ -1196,60 +1307,6 @@ def get_additional_client_options_url(args): return '' -def read_test_tags(suite_dir: str, all_tests: List[str]) -> Dict[str, Set[str]]: - def get_comment_sign(filename): - if filename.endswith('.sql') or filename.endswith('.sql.j2'): - return '--' - elif filename.endswith('.sh') or filename.endswith('.py') or filename.endswith('.expect'): - return '#' - else: - raise Exception(f'Unknown file_extension: {filename}') - - def parse_tags_from_line(line, comment_sign): - if not line.startswith(comment_sign): - return None - tags_str = line[len(comment_sign):].lstrip() - tags_prefix = "Tags:" - if not tags_str.startswith(tags_prefix): - return None - tags_str = tags_str[len(tags_prefix):] - tags = tags_str.split(',') - tags = {tag.strip() for tag in tags} - return tags - - def is_shebang(line): - return line.startswith('#!') - - def load_tags_from_file(filepath): - with open(filepath, 'r') as file: - try: - line = file.readline() - if is_shebang(line): - line = file.readline() - except UnicodeDecodeError: - return [] - return parse_tags_from_line(line, get_comment_sign(filepath)) - - all_tags = {} - start_time = datetime.now() - for test_name in all_tests: - tags = load_tags_from_file(os.path.join(suite_dir, test_name)) - if tags: - all_tags[test_name] = tags - elapsed = (datetime.now() - start_time).total_seconds() - if elapsed > 1: - print(f"Tags for suite {suite_dir} read in {elapsed:.2f} seconds") - return all_tags - - -def collect_sequential_list(all_tags: Dict[str, Set[str]]) -> List[str]: - res = [] - for test_name, tags in all_tags.items(): - if ('no-parallel' in tags) or ('sequential' in tags): - res.append(test_name) - return res - - if __name__ == '__main__': # Move to a new process group and kill it at exit so that we don't have any # infinite tests processes left