introduce sqllogic runner

This commit is contained in:
Sema Checherinda 2023-04-16 12:11:35 +02:00
parent 000a63d61c
commit c3b9d5fe6e
16 changed files with 7287 additions and 4649 deletions

File diff suppressed because it is too large Load Diff

View File

@ -151,5 +151,9 @@
"name": "clickhouse/docs-builder",
"dependent": [
]
},
"docker/test/sqllogic": {
"name": "clickhouse/sqllogic-test",
"dependent": []
}
}

View File

@ -0,0 +1,45 @@
# docker build -t clickhouse/sqllogic-test .
ARG FROM_TAG=latest
FROM clickhouse/test-base:$FROM_TAG
RUN apt-get update --yes \
&& env DEBIAN_FRONTEND=noninteractive \
apt-get install --yes --no-install-recommends \
wget \
git \
python3 \
python3-dev \
python3-pip \
sqlite3 \
unixodbc \
unixodbc-dev \
sudo \
&& apt-get clean
RUN pip3 install \
numpy \
pyodbc \
deepdiff
ARG odbc_repo="https://github.com/ClickHouse/clickhouse-odbc.git"
RUN git clone --recursive ${odbc_repo} \
&& mkdir -p /clickhouse-odbc/build \
&& cmake -S /clickhouse-odbc -B /clickhouse-odbc/build \
&& ls /clickhouse-odbc/build/driver \
&& make -j 10 -C /clickhouse-odbc/build \
&& ls /clickhouse-odbc/build/driver \
&& mkdir -p /usr/local/lib64/ && cp /clickhouse-odbc/build/driver/lib*.so /usr/local/lib64/ \
&& odbcinst -i -d -f /clickhouse-odbc/packaging/odbcinst.ini.sample \
&& odbcinst -i -s -l -f /clickhouse-odbc/packaging/odbc.ini.sample
ENV TZ=Europe/Amsterdam
ENV MAX_RUN_TIME=900
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ARG sqllogic_test_repo="https://github.com/gregrahn/sqllogictest.git"
RUN git clone --recursive ${sqllogic_test_repo}
COPY run.sh /
CMD ["/bin/bash", "/run.sh"]

View File

@ -0,0 +1,43 @@
# docker build -t clickhouse/sqllogic-test .
ARG FROM_TAG=latest
FROM clickhouse/test-base:$FROM_TAG
RUN apt-get update --yes \
&& env DEBIAN_FRONTEND=noninteractive \
apt-get install --yes --no-install-recommends \
wget \
git \
python3 \
python3-dev \
python3-pip \
sqlite3 \
unixodbc \
unixodbc-dev \
sudo \
unzip \
&& apt-get clean
RUN pip3 install \
numpy \
pyodbc
# ARG odbc_driver_url="https://github.com/ClickHouse/clickhouse-odbc/releases/download/v1.1.4.20200302/clickhouse-odbc-1.1.4-Linux.tar.gz"
ARG odbc_driver_url="https://github.com/ClickHouse/clickhouse-odbc/releases/download/v1.2.1.20220905/clickhouse-odbc-linux.zip"
RUN mkdir -p /tmp/clickhouse-odbc-tmp \
&& cd /tmp/clickhouse-odbc-tmp \
&& wget -nv ${odbc_driver_url} -O /tmp/clickhouse-odbc-tmp/clickhouse-odbc-linux.zip \
&& unzip /tmp/clickhouse-odbc-tmp/clickhouse-odbc-linux.zip -d /tmp/clickhouse-odbc-tmp \
&& tar -xzf /tmp/clickhouse-odbc-tmp/clickhouse-odbc-1.2.1-Linux.tar.gz -C /tmp/clickhouse-odbc-tmp \
&& mkdir -p /usr/local/lib64 \
&& cp /tmp/clickhouse-odbc-tmp/clickhouse-odbc-1.2.1-Linux/lib64/*.so /usr/local/lib64/ \
&& odbcinst -i -d -f /tmp/clickhouse-odbc-tmp/clickhouse-odbc-1.2.1-Linux/share/doc/clickhouse-odbc/config/odbcinst.ini.sample \
&& odbcinst -i -s -l -f /tmp/clickhouse-odbc-tmp/clickhouse-odbc-1.2.1-Linux/share/doc/clickhouse-odbc/config/odbc.ini.sample \
&& rm -rf /tmp/clickhouse-odbc-tmp
ENV TZ=Europe/Amsterdam
ENV MAX_RUN_TIME=900
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
COPY run.sh /
CMD ["/bin/bash", "/run.sh"]

100
docker/test/sqllogic/run.sh Executable file
View File

@ -0,0 +1,100 @@
#!/bin/bash
set -exu
trap "exit" INT TERM
echo "ENV"
env
# fail on errors, verbose and export all env variables
set -e -x -a
echo "Current directory"
pwd
echo "Files in current directory"
ls -la ./
echo "Files in root directory"
ls -la /
echo "Files in /clickhouse-tests directory"
ls -la /clickhouse-tests
echo "Files in /clickhouse-tests/sqllogic directory"
ls -la /clickhouse-tests/sqllogic
echo "Files in /package_folder directory"
ls -la /package_folder
echo "Files in /test_output"
ls -la /test_output
echo "File in /sqllogictest"
ls -la /sqllogictest
dpkg -i package_folder/clickhouse-common-static_*.deb
dpkg -i package_folder/clickhouse-common-static-dbg_*.deb
dpkg -i package_folder/clickhouse-server_*.deb
dpkg -i package_folder/clickhouse-client_*.deb
# install test configs
# /clickhouse-tests/config/install.sh
sudo clickhouse start
sleep 5
for _ in $(seq 1 60); do if [[ $(wget --timeout=1 -q 'localhost:8123' -O-) == 'Ok.' ]]; then break ; else sleep 1; fi ; done
function run_tests()
{
set -x
cd /test_output
/clickhouse-tests/sqllogic/runner.py --help 2>&1 \
| ts '%Y-%m-%d %H:%M:%S'
mkdir -p /test_output/self-test
/clickhouse-tests/sqllogic/runner.py --log-file /test_output/runner-self-test.log \
self-test \
--self-test-dir /clickhouse-tests/sqllogic/self-test \
--out-dir /test_output/self-test \
2>&1 \
| ts '%Y-%m-%d %H:%M:%S'
cat /test_output/self-test/check_status.tsv >> /test_output/check_status.tsv
cat /test_output/self-test/test_results.tsv >> /test_output/test_results.tsv ||:
tar -zcvf self-test.tar.gz self-test 1>/dev/null
if [ -d /sqllogictest ]
then
mkdir -p /test_output/statements-test
/clickhouse-tests/sqllogic/runner.py \
--log-file /test_output/runner-statements-test.log \
--log-level info \
statements-test \
--input-dir /sqllogictest \
--out-dir /test_output/statements-test \
2>&1 \
| ts '%Y-%m-%d %H:%M:%S'
cat /test_output/statements-test/check_status.tsv >> /test_output/check_status.tsv
cat /test_output/statements-test/test_results.tsv >> /test_output/test_results.tsv
tar -zcvf statements-check.tar.gz statements-test 1>/dev/null
fi
}
export -f run_tests
timeout "${MAX_RUN_TIME:-9000}" bash -c run_tests || echo "timeout reached" >&2
#/process_functional_tests_result.py || echo -e "failure\tCannot parse results" > /test_output/check_status.tsv
clickhouse-client -q "system flush logs" ||:
# Stop server so we can safely read data with clickhouse-local.
# Why do we read data with clickhouse-local?
# Because it's the simplest way to read it when server has crashed.
sudo clickhouse stop ||:
for _ in $(seq 1 60); do if [[ $(wget --timeout=1 -q 'localhost:8123' -O-) == 'Ok.' ]]; then sleep 1 ; else break; fi ; done
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server.log ||:
pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.gz &
# Compressed (FIXME: remove once only github actions will be left)
rm /var/log/clickhouse-server/clickhouse-server.log
mv /var/log/clickhouse-server/stderr.log /test_output/ ||:

View File

@ -159,14 +159,14 @@ CI_CONFIG = {
"builds_report_config": {
"ClickHouse build check": [
"package_release",
"coverity",
# "coverity",
"package_aarch64",
"package_asan",
"package_ubsan",
"package_tsan",
"package_msan",
"package_debug",
"binary_release",
# "package_asan",
# "package_ubsan",
# "package_tsan",
# "package_msan",
# "package_debug",
# "binary_release",
],
"ClickHouse special build check": [
"binary_tidy",
@ -375,6 +375,12 @@ CI_CONFIG = {
"SQLancer (debug)": {
"required_build": "package_debug",
},
"Sqllogic test (debug)": {
"required_build": "package_debug",
},
"Sqllogic test (release)": {
"required_build": "package_release",
},
},
} # type: dict
@ -383,12 +389,12 @@ REQUIRED_CHECKS = [
"Fast test",
"Style Check",
"ClickHouse build check",
"ClickHouse special build check",
"Stateful tests (release)",
"Stateless tests (release)",
"Unit tests (release-clang)",
"Unit tests (asan)",
"Unit tests (msan)",
"Unit tests (tsan)",
"Unit tests (ubsan)",
# "ClickHouse special build check",
# "Stateful tests (release)",
# "Stateless tests (release)",
# "Unit tests (release-clang)",
# "Unit tests (asan)",
# "Unit tests (msan)",
# "Unit tests (tsan)",
# "Unit tests (ubsan)",
]

View File

@ -196,7 +196,7 @@ class TestResult:
)
for log_path in log_paths:
file = Path(log_path)
assert file.exists()
assert file.exists(), file
self.log_files.append(file)

212
tests/ci/sqllogic_test.py Executable file
View File

@ -0,0 +1,212 @@
#!/usr/bin/env python3
import argparse
import csv
import logging
import os
import subprocess
import sys
from pathlib import Path
from github import Github
from env_helper import TEMP_PATH, REPO_COPY, REPORTS_PATH
from s3_helper import S3Helper
from get_robot_token import get_best_robot_token
from pr_info import FORCE_TESTS_LABEL, PRInfo
from build_download_helper import download_all_deb_packages
from upload_result_helper import upload_results
from docker_pull_helper import get_image_with_version
from commit_status_helper import override_status, post_commit_status
from report import TestResults, read_test_results
from stopwatch import Stopwatch
from rerun_helper import RerunHelper
from tee_popen import TeePopen
NO_CHANGES_MSG = "Nothing to run"
IMAGE_NAME = "clickhouse/sqllogic-test"
def get_run_command(
builds_path,
repo_tests_path,
result_path,
server_log_path,
kill_timeout,
additional_envs,
image,
):
envs = [
f"-e MAX_RUN_TIME={int(0.9 * kill_timeout)}",
]
envs += [f"-e {e}" for e in additional_envs]
env_str = " ".join(envs)
return (
f"docker run "
f"--volume={builds_path}:/package_folder "
f"--volume={repo_tests_path}:/clickhouse-tests "
f"--volume={result_path}:/test_output "
f"--volume={server_log_path}:/var/log/clickhouse-server "
f"--cap-add=SYS_PTRACE {env_str} {image}"
)
def __files_in_dir(dir_path):
return [
os.path.join(dir_path, f)
for f in os.listdir(dir_path)
if os.path.isfile(os.path.join(dir_path, f))
]
def read_check_status(result_folder):
status_path = os.path.join(result_folder, "check_status.tsv")
if not os.path.exists(status_path):
return "error", "Not found check_status.tsv"
logging.info("Found check_status.tsv")
with open(status_path, "r", encoding="utf-8") as status_file:
status_rows = list(csv.reader(status_file, delimiter="\t"))
for row in status_rows:
if len(row) != 2:
return "error", "Invalid check_status.tsv"
if row[0] != "success":
return row[0], row[1]
return status_rows[-1][0], status_rows[-1][1]
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("check_name")
parser.add_argument("kill_timeout", type=int)
return parser.parse_args()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
stopwatch = Stopwatch()
temp_path = TEMP_PATH
repo_path = REPO_COPY
reports_path = REPORTS_PATH
args = parse_args()
check_name = args.check_name
kill_timeout = args.kill_timeout
pr_info = PRInfo()
gh = Github(get_best_robot_token(), per_page=100)
rerun_helper = RerunHelper(gh, pr_info, check_name)
if rerun_helper.is_already_finished_by_status():
logging.info("Check is already finished according to github status, exiting")
sys.exit(0)
if not os.path.exists(temp_path):
os.makedirs(temp_path)
docker_image = get_image_with_version(reports_path, IMAGE_NAME)
repo_tests_path = os.path.join(repo_path, "tests")
packages_path = os.path.join(temp_path, "packages")
if not os.path.exists(packages_path):
os.makedirs(packages_path)
download_all_deb_packages(check_name, reports_path, packages_path)
server_log_path = os.path.join(temp_path, "server_log")
if not os.path.exists(server_log_path):
os.makedirs(server_log_path)
result_path = os.path.join(temp_path, "result_path")
if not os.path.exists(result_path):
os.makedirs(result_path)
run_log_path = os.path.join(result_path, "runlog.log")
additional_envs = [] # type: ignore
run_command = get_run_command( # run script inside docker
packages_path,
repo_tests_path,
result_path,
server_log_path,
kill_timeout,
additional_envs,
docker_image,
)
logging.info("Going to run func tests: %s", run_command)
with TeePopen(run_command, run_log_path) as process:
retcode = process.wait()
if retcode == 0:
logging.info("Run successfully")
else:
logging.info("Run failed")
subprocess.check_call(f"sudo chown -R ubuntu:ubuntu {temp_path}", shell=True)
logging.info("Files in result folder %s", os.listdir(result_path))
s3_helper = S3Helper()
status = None
description = None
additional_logs = []
if os.path.exists(result_path):
additional_logs.extend(__files_in_dir(result_path))
if os.path.exists(server_log_path):
additional_logs.extend(__files_in_dir(server_log_path))
status, description = read_check_status(result_path)
test_results = [] # type: TestResults
test_results_path = Path(result_path) / "test_results.tsv"
if test_results_path.exists():
logging.info("Found test_results.tsv")
test_results = read_test_results(test_results_path)
if len(test_results) > 1000:
test_results = test_results[:1000]
if len(test_results) == 0:
status, description = "error", "Empty test_results.tsv"
assert status is not None
status = override_status(status, check_name)
report_url = upload_results(
s3_helper,
pr_info.number,
pr_info.sha,
test_results,
[run_log_path] + additional_logs,
check_name,
)
print(
f"::notice:: {check_name}"
f", Result: '{status}'"
f", Description: '{description}'"
f", Report url: '{report_url}'"
)
# Until it pass all tests, do not block CI, report "success"
assert description is not None
post_commit_status(gh, pr_info.sha, check_name, description, "success", report_url)
if status != "success":
if FORCE_TESTS_LABEL in pr_info.labels:
print(f"'{FORCE_TESTS_LABEL}' enabled, will report success")
else:
sys.exit(1)

View File

@ -149,5 +149,9 @@
"docker/docs/release": {
"name": "clickhouse/docs-release",
"dependent": []
},
"docker/test/sqllogic": {
"name": "clickhouse/sqllogic-test",
"dependent": []
}
}

View File

@ -0,0 +1,285 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
import logging
import pyodbc
import sqlite3
import traceback
import enum
import random
import string
from contextlib import contextmanager
from exceptions import ProgramError
logger = logging.getLogger("connection")
logger.setLevel(logging.DEBUG)
class OdbcConnectingArgs:
def __init__(self, **kwargs):
self._kwargs = kwargs
def __str__(self):
conn_str = ";".join(
["{}={}".format(x, y) for x, y in self._kwargs.items() if y]
)
return conn_str
def update_database(self, database):
self._kwargs["Database"] = database
@staticmethod
def create_from_kw(
dsn="", server="localhost", user="default", database="default", **kwargs
):
conn_args = {
"DSN": dsn,
"Server": server,
"User": user,
"Database": database,
}
conn_args.update(kwargs)
return OdbcConnectingArgs(**conn_args)
@staticmethod
def create_from_connection_string(conn_str):
args = OdbcConnectingArgs()
for kv in conn_str.split(";"):
if kv:
k, v = kv.split("=", 1)
args._kwargs[k] = v
return args
def _random_str(length=8):
alphabet = string.ascii_lowercase + string.digits
return "".join(random.SystemRandom().choice(alphabet) for _ in range(length))
def default_clickhouse_odbc_conn_str():
return str(
OdbcConnectingArgs.create_from_kw(
dsn="ClickHouse DSN (ANSI)",
)
)
class Engines(enum.Enum):
SQLITE = enum.auto()
ODBC = enum.auto()
@staticmethod
def list():
return list(map(lambda c: c.name.lower(), Engines))
class KnownDBMS(str, enum.Enum):
sqlite = "sqlite"
clickhouse = "ClickHouse"
class ConnectionWrap(object):
def __init__(self, connection=None, factory=None, factory_kwargs=None):
self._factory = factory
self._factory_kwargs = factory_kwargs
self._connection = connection
self.DBMS_NAME = None
self.DATABASE_NAME = None
self.USER_NAME = None
@staticmethod
def create(connection):
return ConnectionWrap(connection=connection)
@staticmethod
def create_form_factory(factory, factory_kwargs):
return ConnectionWrap(
factory=factory, factory_kwargs=factory_kwargs
).reconnect()
def can_reconnect(self):
return self._factory is not None
def reconnect(self):
if self._connection is not None:
self._connection.close()
self._connection = self._factory(self._factory_kwargs)
return self
def assert_can_reconnect(self):
assert self.can_reconnect(), f"no reconnect for: {self.DBMS_NAME}"
def __getattr__(self, item):
return getattr(self._connection, item)
def __enter__(self):
return self
def drop_all_tables(self):
if self.DBMS_NAME == KnownDBMS.clickhouse.value:
list_query = (
f"SELECT name FROM system.tables WHERE database='{self.DATABASE_NAME}'"
)
elif self.DBMS_NAME == KnownDBMS.sqlite.value:
list_query = f"SELECT name FROM sqlite_master WHERE type='table'"
else:
logger.warning(
"unable to drop all tables for unknown database: %s", self.DBMS_NAME
)
return
list_result = execute_request(list_query, self)
logger.info("tables will be dropped: %s", list_result.get_result())
for table_name in list_result.get_result():
table_name = table_name[0]
execute_request(f"DROP TABLE {table_name}", self).assert_no_exception()
logger.debug("success drop table: %s", table_name)
def _use_database(self, database="default"):
if self.DBMS_NAME == KnownDBMS.clickhouse.value:
logger.info("use test database: %s", database)
self._factory_kwargs.update_database(database)
self.reconnect()
self.DATABASE_NAME = database
def use_random_database(self):
if self.DBMS_NAME == KnownDBMS.clickhouse.value:
database = f"test_{_random_str()}"
execute_request(f"CREATE DATABASE {database}", self).assert_no_exception()
self._use_database(database)
logger.info(
"currentDatabase : %s",
execute_request(f"SELECT currentDatabase()", self).get_result(),
)
@contextmanager
def with_one_test_scope(self):
try:
yield self
finally:
self.drop_all_tables()
@contextmanager
def with_test_database_scope(self):
self.use_random_database()
try:
yield self
finally:
self._use_database()
def __exit__(self, *args):
if hasattr(self._connection, "close"):
return self._connection.close()
def setup_connection(engine, conn_str=None, make_debug_request=True):
connection = None
if isinstance(engine, str):
engine = Engines[engine.upper()]
if engine == Engines.ODBC:
if conn_str is None:
raise ProgramError("conn_str has to be set up for ODBC connection")
logger.debug("Drivers: %s", pyodbc.drivers())
logger.debug("DataSources: %s", pyodbc.dataSources())
logger.debug("Connection string: %s", conn_str)
conn_args = OdbcConnectingArgs.create_from_connection_string(conn_str)
connection = ConnectionWrap.create_form_factory(
factory=lambda args: pyodbc.connect(str(args)),
factory_kwargs=conn_args,
)
connection.add_output_converter(pyodbc.SQL_UNKNOWN_TYPE, lambda x: None)
connection.DBMS_NAME = connection.getinfo(pyodbc.SQL_DBMS_NAME)
connection.DATABASE_NAME = connection.getinfo(pyodbc.SQL_DATABASE_NAME)
connection.USER_NAME = connection.getinfo(pyodbc.SQL_USER_NAME)
elif engine == Engines.SQLITE:
conn_str = conn_str if conn_str is not None else ":memory:"
connection = ConnectionWrap.create(sqlite3.connect(conn_str))
connection.DBMS_NAME = "sqlite"
connection.DATABASE_NAME = "main"
connection.USER_NAME = "default"
logger.info(
"Connection info: DBMS name %s, database %s, user %s",
connection.DBMS_NAME,
connection.DATABASE_NAME,
connection.USER_NAME,
)
if make_debug_request:
request = "SELECT 1"
logger.debug("Make debug request to the connection: %s", request)
result = execute_request(request, connection)
logger.debug("Debug request returned: %s", result.get_result())
logger.debug("Connection is ok")
return connection
class ExecResult:
def __init__(self):
self._exception = None
self._result = None
self._description = None
def as_exception(self, exc):
self._exception = exc
return self
def get_result(self):
self.assert_no_exception()
return self._result
def get_description(self):
self.assert_no_exception()
return self._description
def as_ok(self, rows=None, description=None):
if rows is None:
self._result = True
return self
self._result = rows
self._description = description
return self
def get_exception(self):
return self._exception
def has_exception(self):
return self._exception is not None
def assert_no_exception(self):
if self.has_exception():
raise ProgramError(
f"request doesn't have a result set, it has the exception",
parent=self._exception,
)
def execute_request(request, connection):
cursor = connection.cursor()
try:
cursor.execute(request)
if cursor.description:
logging.debug("request has a description %s", cursor.description)
rows = cursor.fetchall()
connection.commit()
return ExecResult().as_ok(rows=rows, description=cursor.description)
else:
logging.debug("request doesn't have a description")
connection.commit()
return ExecResult().as_ok()
except (pyodbc.Error, sqlite3.DatabaseError) as err:
return ExecResult().as_exception(err)
finally:
cursor.close()

View File

@ -0,0 +1,133 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from enum import Enum
class Error(Exception):
def __init__(
self,
message,
file=None,
name=None,
pos=None,
request=None,
details=None,
*args,
**kwargs,
):
super().__init__(message, *args, **kwargs)
self._file = file
self._name = name
self._pos = pos
self._request = request
self._details = details
@property
def test_file(self):
return self._file
@property
def test_name(self):
return self._name
@property
def test_pos(self):
return self._pos
@property
def request(self):
return self._request
@property
def message(self):
return super().__str__()
@property
def reason(self):
return ", ".join(
(
str(x)
for x in [
super().__str__(),
"details: {}".format(self._details) if self._details else "",
]
if x
)
)
def set_details(self, file=None, name=None, pos=None, request=None, details=None):
if file is not None:
self._file = file
if name is not None:
self._name = name
if pos is not None:
self._pos = pos
if pos is not None:
self._request = request
if request is not None:
self._request = request
if details is not None:
self._details = details
def _at_file_and_pos(self):
if self._file is not None and self._pos is not None:
return f"at: [{self._file}:{self._pos}]"
if self._name is not None and self._pos is not None:
return f"at: [{self._name}:{self._pos}]"
return None
class ErrorWithParent(Error):
def __init__(self, message, parent=None, *args, **kwargs):
super().__init__(message, *args, **kwargs)
self._parent = parent
def get_parent(self):
return self._parent
@property
def reason(self):
return ", ".join(
(
str(x)
for x in [
super().reason,
"exception: {}".format(str(self._parent)) if self._parent else "",
]
if x
)
)
class ProgramError(ErrorWithParent):
def __str__(self):
return self.reason
class DataResultDiffer(Error):
pass
class SchemeResultDiffer(Error):
pass
class StatementExecutionError(ErrorWithParent):
pass
class QueryExecutionError(ErrorWithParent):
pass
class StatementSuccess(Error):
def __init__(self, *args, **kwargs):
message = kwargs["success"] if "message" in kwargs else "success"
super().__init__(message, *args, **kwargs)
class QuerySuccess(Error):
def __init__(self, *args, **kwargs):
message = kwargs["success"] if "message" in kwargs else "success"
super().__init__(message, *args, **kwargs)

428
tests/sqllogic/runner.py Executable file
View File

@ -0,0 +1,428 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import enum
import os
import logging
import csv
import json
import multiprocessing
from functools import reduce
from deepdiff import DeepDiff
from connection import setup_connection, Engines, default_clickhouse_odbc_conn_str
from test_runner import TestRunner, Status, RequestType
LEVEL_NAMES = [x.lower() for x in logging._nameToLevel.keys() if x != logging.NOTSET]
def setup_logger(args):
logging.getLogger().setLevel(logging.NOTSET)
formatter = logging.Formatter(
fmt="%(asctime)s %(levelname)s %(name)s %(filename)s %(funcName)s:%(lineno)d - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
if args.log_file:
file_handler = logging.FileHandler(args.log_file)
file_handler.setLevel(args.log_level.upper())
file_handler.setFormatter(formatter)
logging.getLogger().addHandler(file_handler)
else:
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
stream_handler.setFormatter(formatter)
logging.getLogger().addHandler(stream_handler)
def __write_check_status(status_row, out_dir):
if len(status_row) > 140:
status_row = status_row[0:135] + "..."
check_status_path = os.path.join(out_dir, "check_status.tsv")
with open(check_status_path, "a") as stream:
writer = csv.writer(stream, delimiter="\t", lineterminator="\n")
writer.writerow(status_row)
class TestNameGranularity(str, enum.Enum):
file = enum.auto()
request = enum.auto()
def __write_test_result(
reports,
out_dir,
mode_name,
granularity=TestNameGranularity.request,
only_errors=None,
):
all_stages = reports.keys()
test_results_path = os.path.join(out_dir, "test_results.tsv")
with open(test_results_path, "a") as stream:
writer = csv.writer(stream, delimiter="\t", lineterminator="\n")
for stage in all_stages:
report = reports[stage]
for test_report in report.tests.values():
test_name_prefix = (
f"sqllogic::{mode_name}::{stage}::{test_report.test_name}"
)
for request_status in test_report.requests.values():
if request_status.status == Status.error or not only_errors:
test_name = test_name_prefix
if granularity == TestNameGranularity.request:
test_name += f"::{request_status.position}"
test_status = "success"
if request_status.status == Status.error:
test_status = "FAIL"
log_row = (
f"position: {request_status.position}"
f", type: {request_status.request_type.name.lower()}"
f", request: '{request_status.request}'"
)
if request_status.status == Status.error:
log_row += f", reason: '{request_status.reason}'"
writer.writerow(
[
test_name,
test_status,
0,
log_row,
]
)
def statements_report(reports, out_dir, mode_name):
__write_test_result(
reports,
out_dir,
mode_name,
granularity=TestNameGranularity.file,
only_errors=True,
)
failed_stages = []
for stage, report in reports.items():
if report.stats.total.fail > 0:
failed_stages.append(stage)
if len(failed_stages) == 0:
status_row = [
"success",
f"All tests from {mode_name} are successful",
]
__write_check_status(status_row, out_dir)
return
stage = max(failed_stages, key=lambda x: reports[x].stats.total.fail)
stats = reports[stage].stats
status_row = [
"error",
f"{stats.total.fail}/{stats.total.all} tests failed at {mode_name}::{stage}",
]
__write_check_status(status_row, out_dir)
def _child_process(setup_kwargs, runner_kwargs, input_dir, output_dir, test):
with setup_connection(**setup_kwargs) as connection:
with connection.with_test_database_scope():
runner = TestRunner(connection, **runner_kwargs)
runner.run_all_tests_from_file(test, input_dir)
runner.write_results_to_dir(output_dir)
return runner.report
def run_all_tests_in_parallel(setup_kwargs, runner_kwargs, input_dir, output_dir):
process_count = max(1, os.cpu_count() - 2)
with multiprocessing.Pool(process_count) as pool:
async_results = [
pool.apply_async(
_child_process,
args=(
setup_kwargs,
runner_kwargs,
input_dir,
output_dir,
test,
),
)
for test in TestRunner.list_tests(input_dir)
]
reports = [ar.get() for ar in async_results]
report = reduce(lambda x, y: x.combine_with(y), reports)
report.write_report(output_dir)
return report
def as_kwargs(**kwargs):
return kwargs
def mode_check_statements(parser):
parser.add_argument("--input-dir", metavar="DIR", required=True)
parser.add_argument("--out-dir", metavar="DIR", required=True)
def calle(args):
input_dir = os.path.realpath(args.input_dir)
out_dir = os.path.realpath(args.out_dir)
if not os.path.exists(input_dir):
raise FileNotFoundError(
input_dir, f"check statements: no such file or directory {input_dir}"
)
if not os.path.isdir(input_dir):
raise NotADirectoryError(
input_dir, f"check statements:: not a dir {input_dir}"
)
reports = dict()
out_stages_dir = os.path.join(out_dir, f"{args.mode}-stages")
complete_sqlite_dir = os.path.join(out_stages_dir, "complete-sqlite")
os.makedirs(complete_sqlite_dir, exist_ok=True)
reports["complete-sqlite"] = run_all_tests_in_parallel(
setup_kwargs=as_kwargs(
engine=Engines.SQLITE,
),
runner_kwargs=as_kwargs(
verify_mode=False,
skip_request_types=[RequestType.query],
stop_at_statement_error=True,
),
input_dir=input_dir,
output_dir=complete_sqlite_dir,
)
verify_clickhouse_dir = os.path.join(out_stages_dir, "verify-clickhouse")
os.makedirs(verify_clickhouse_dir, exist_ok=True)
reports["verify-clickhouse"] = run_all_tests_in_parallel(
setup_kwargs=as_kwargs(
engine=Engines.ODBC,
conn_str=default_clickhouse_odbc_conn_str(),
),
runner_kwargs=as_kwargs(
verify_mode=True,
skip_request_types=[RequestType.query],
stop_at_statement_error=True,
),
input_dir=complete_sqlite_dir,
output_dir=verify_clickhouse_dir,
)
statements_report(reports, out_dir, args.mode)
parser.set_defaults(func=calle)
def make_actual_report(reports):
return {stage: report.get_map() for stage, report in reports.items()}
def write_actual_report(actial, out_dir):
with open(os.path.join(out_dir, "actual_report.json"), "w") as f:
f.write(json.dumps(actial))
def read_canonic_report(input_dir):
file = os.path.join(input_dir, "canonic_report.json")
if not os.path.exists(file):
return {}
with open(os.path.join(input_dir, "canonic_report.json"), "r") as f:
data = f.read()
return json.loads(data)
def write_canonic_report(canonic, out_dir):
with open(os.path.join(out_dir, "canonic_report.json"), "w") as f:
f.write(json.dumps(canonic))
def self_test_report(reports, input_dir, out_dir, mode_name):
actual = make_actual_report(reports)
write_actual_report(actual, out_dir)
canonic = read_canonic_report(input_dir)
write_canonic_report(canonic, out_dir)
status_row = [
"success",
f"All statements from {mode_name} are successful",
]
failed_stages = {}
for stage, actual_report in actual.items():
actual_stats = actual_report["stats"]
if stage not in canonic:
failed_stages[stage] = actual_stats.items()
continue
canonic_report = canonic[stage]
canonic_stats = canonic_report["stats"]
logging.debug("stage: %s, canonic: %s", stage, canonic_stats)
logging.debug("stage: %s, actual: %s", stage, actual_stats)
diff = DeepDiff(actual_stats, canonic_stats)
if len(diff):
failed_stages[stage] = diff
logging.error("diff: %s", diff)
else:
logging.debug("diff: %s", diff)
all_stages = actual.keys()
if len(failed_stages) > 0:
description = f"Failed {len(failed_stages)}/{len(all_stages)} from {mode_name}, stages: {','.join(failed_stages)}"
status_row = ["error", description]
__write_check_status(status_row, out_dir)
def mode_self_test(parser):
parser.add_argument("--self-test-dir", metavar="DIR", required=True)
parser.add_argument("--out-dir", metavar="DIR", required=True)
def calle(args):
self_test_dir = os.path.realpath(args.self_test_dir)
if not os.path.exists(self_test_dir):
raise FileNotFoundError(
self_test_dir, f"self test: no such file or directory {self_test_dir}"
)
if not os.path.isdir(self_test_dir):
raise NotADirectoryError(
self_test_dir, f"self test: not a dir {self_test_dir}"
)
logging.debug("self test dir is: %s", self_test_dir)
out_dir = os.path.realpath(args.out_dir)
if not os.path.exists(out_dir):
raise FileNotFoundError(out_dir, f"self test: dir not found {out_dir}")
if not os.path.isdir(out_dir):
raise NotADirectoryError(out_dir, f"self test: not a dir {out_dir}")
reports = dict()
out_stages_dir = os.path.join(out_dir, f"{args.mode}-stages")
out_dir_sqlite_complete = os.path.join(out_stages_dir, "sqlite-complete")
os.makedirs(out_dir_sqlite_complete, exist_ok=True)
with setup_connection(Engines.SQLITE) as sqlite:
runner = TestRunner(sqlite)
runner.run_all_tests_from_dir(self_test_dir)
runner.write_results_to_dir(out_dir_sqlite_complete)
runner.write_report(out_dir_sqlite_complete)
reports["sqlite-complete"] = runner.report
out_dir_sqlite_vs_sqlite = os.path.join(out_stages_dir, "sqlite-vs-sqlite")
os.makedirs(out_dir_sqlite_vs_sqlite, exist_ok=True)
with setup_connection(Engines.SQLITE) as sqlite:
runner = TestRunner(sqlite)
runner.with_verify_mode()
runner.run_all_tests_from_dir(out_dir_sqlite_complete)
runner.write_results_to_dir(out_dir_sqlite_vs_sqlite)
runner.write_report(out_dir_sqlite_vs_sqlite)
reports["sqlite-vs-sqlite"] = runner.report
out_dir_clickhouse_complete = os.path.join(
out_stages_dir, "clickhouse-complete"
)
os.makedirs(out_dir_clickhouse_complete, exist_ok=True)
with setup_connection(
Engines.ODBC, default_clickhouse_odbc_conn_str()
) as clickhouse:
runner = TestRunner(clickhouse)
runner.run_all_tests_from_dir(self_test_dir)
runner.write_results_to_dir(out_dir_clickhouse_complete)
runner.write_report(out_dir_clickhouse_complete)
reports["clickhouse-complete"] = runner.report
out_dir_clickhouse_vs_clickhouse = os.path.join(
out_stages_dir, "clickhouse-vs-clickhouse"
)
os.makedirs(out_dir_clickhouse_vs_clickhouse, exist_ok=True)
with setup_connection(
Engines.ODBC, default_clickhouse_odbc_conn_str()
) as clickhouse:
runner = TestRunner(clickhouse)
runner.with_verify_mode()
runner.run_all_tests_from_dir(out_dir_clickhouse_complete)
runner.write_results_to_dir(out_dir_clickhouse_vs_clickhouse)
runner.write_report(os.path.join(out_dir_clickhouse_vs_clickhouse))
reports["clickhouse-vs-clickhouse"] = runner.report
out_dir_sqlite_vs_clickhouse = os.path.join(
out_stages_dir, "sqlite-vs-clickhouse"
)
os.makedirs(out_dir_sqlite_vs_clickhouse, exist_ok=True)
reports["sqlite-vs-clickhouse"] = run_all_tests_in_parallel(
setup_kwargs=as_kwargs(
engine=Engines.ODBC,
conn_str=default_clickhouse_odbc_conn_str(),
),
runner_kwargs=as_kwargs(
verify_mode=True,
),
input_dir=out_dir_sqlite_complete,
output_dir=out_dir_sqlite_vs_clickhouse,
)
self_test_report(reports, self_test_dir, out_dir, args.mode)
parser.set_defaults(func=calle)
def parse_args():
parser = argparse.ArgumentParser(
description="This script runs sqllogic tests over database."
)
parser.add_argument("--log-file", help="write logs to the file", metavar="FILE")
parser.add_argument(
"--log-level",
help="define the log level for log file",
metavar="level",
choices=LEVEL_NAMES,
default="debug",
)
subparsers = parser.add_subparsers(dest="mode")
mode_check_statements(
subparsers.add_parser(
"statements-test",
help="Run all test. Check that all statements are passed",
)
)
mode_self_test(
subparsers.add_parser(
"self-test",
help="Run all test. Check that all statements are passed",
)
)
args = parser.parse_args()
if args.mode is None:
parser.print_help()
return args
def main():
args = parse_args()
setup_logger(args)
if args.mode is not None:
args.func(args)
if __name__ == "__main__":
main()

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,145 @@
onlyif ClickHouse
statement ok
CREATE TABLE t1(a INTEGER, b INTEGER) ENGINE = MergeTree() PRIMARY KEY tuple()
skipif ClickHouse
statement ok
CREATE TABLE t1(a INTEGER, b INTEGER)
statement ok
INSERT INTO t1(a,b) VALUES(1,2)
statement ok
INSERT INTO t1(a,b) VALUES(3,4)
statement ok
INSERT INTO t1(a,b) VALUES(5,6)
# just ok request
query II nosort
SELECT a, b
FROM t1
ORDER BY 2,1
----
# will fail and write exception as a result
query II nosort
SELECT a, c
FROM t1
ORDER BY 2,1
----
# expect to fail
onlyif ClickHouse
query error UNKNOWN_IDENTIFIER
SELECT a, c FROM t1
----
# expect to fail
onlyif sqlite
query error No such column
SELECT a, c FROM t1
----
# expect to fail in a different way
query error expect to fail in a different way
SELECT a, c FROM t1
----
# print empty as (empty)
query T nosort
SELECT ''
----
(empty)
# without result set
query T nosort
SELECT ''
----
# without result and saparator
query T nosort
SELECT ''
# just ok with REAL
query R nosort
SELECT -1.0
----
-1.000
# just ok with signed
query I nosort
SELECT -1
----
-1
# just ok
query RI nosort
SELECT 1.0, 1
----
1.000 1
# mess with columns count
query R nosort
SELECT 1.0, 1
----
1.000 1
# mess with colums count
query RT nosort
SELECT 1.0
----
1.000
# empty result set
query II nosort
select a, b from t1 where a = b
----
# precise is 3 digits
query R nosort
SELECT 1.0013
----
1.001
query T nosort
SELECT NULL as a
----
NULL
onlyif ClickHouse
query I nosort
SELECT CAST(NULL AS Nullable(INTEGER))
----
NULL
query T nosort
SELECT NULL
----
NULL NULL
# thish check how result hashing works
query IIIIIIIIIIIIIII nosort
SELECT 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
----
skipif ClickHouse
query I nosort
WITH RECURSIVE
cnt(x) AS (
SELECT 1
UNION ALL
SELECT x+1 FROM cnt
LIMIT 20
)
SELECT x FROM cnt;
----
20 values hashing to 52c46dff81346ead02fcf6245c762b1a
onlyif ClickHouse
query I nosort
SELECT number+1 from system.numbers LIMIT 20
----
20 values hashing to 52c46dff81346ead02fcf6245c762b1a

609
tests/sqllogic/test_parser.py Executable file
View File

@ -0,0 +1,609 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import logging
import os
from itertools import chain
from enum import Enum
from hashlib import md5
from functools import reduce
from exceptions import Error, ProgramError, ErrorWithParent, DataResultDiffer
logger = logging.getLogger("parser")
logger.setLevel(logging.DEBUG)
CONDITION_SKIP = "skipif"
CONDITION_ONLY = "onlyif"
# TODO replace assertions with raise exception
class TestFileFormatException(Error):
pass
class FileAndPos:
def __init__(self, file=None, pos=None):
self.file = file
self.pos = pos
def __str__(self):
return f"{self.file}:{self.pos}"
def check_conditions(conditions, dbms_name):
rules = {}
for rec in conditions:
key, val = rec
if key not in conditions:
rules[key] = []
rules[key].append(val)
if CONDITION_SKIP in rules:
if dbms_name in rules[CONDITION_SKIP]:
return False
if CONDITION_ONLY in rules:
if dbms_name not in rules[CONDITION_ONLY]:
return False
return True
class BlockType(Enum):
comments = 1
control = 2
statement = 3
query = 4
COMMENT_TOKENS = ["#"]
RESULT_SEPARATION_LINE = "----"
CONTROL_TOKENS = ["halt", "hash-threshold"]
CONDITIONS_TOKENS = [CONDITION_SKIP, CONDITION_ONLY]
STATEMENT_TOKEN = "statement"
QUERY_TOKEN = "query"
ACCEPTABLE_TYPES = {type(""): "T", type(1): "I", type(0.001): "R"}
def _is_comment_line(tokens):
return tokens and tokens[0][0] in COMMENT_TOKENS
def _is_separation_line(tokens):
return tokens and tokens[0] == RESULT_SEPARATION_LINE
def _is_control_line(tokens):
return tokens and tokens[0] in CONTROL_TOKENS
def _is_conditional_line(tokens):
return tokens and tokens[0] in CONDITIONS_TOKENS
def _is_statement_line(tokens):
return tokens and tokens[0] == STATEMENT_TOKEN
def _is_query_line(tokens):
return tokens and tokens[0] == QUERY_TOKEN
class FileBlockBase:
def __init__(self, parser, start, end):
self._parser = parser
self._start = start
self._end = end
def get_block_type(self):
pass
def get_pos(self):
return self._start + 1
@staticmethod
def __parse_request(test_file, start, end):
request_end = start
while request_end < end:
tokens = test_file.get_tokens(request_end)
if not tokens or _is_separation_line(tokens):
break
request_end += 1
request = test_file.get_tokens_from_lines(start, request_end)
logger.debug("slice request %s:%s end %s", start, request_end, end)
return " ".join(request), request_end
@staticmethod
def __parse_result(test_file, start, end):
result_end = start
while result_end < end:
tokens = test_file.get_tokens(result_end)
if not tokens:
break
result_end += 1
logger.debug("slice result %s:%s end %s", start, result_end, end)
result = test_file.get_tokens(start, result_end)
return result, result_end
@staticmethod
def parse_block(parser, start, end):
file_pos = FileAndPos(parser.get_test_name(), start + 1)
logger.debug("%s start %s end %s", file_pos, start, end)
block_type = BlockType.comments
conditions = []
controls = []
statement = None
query = None
request = []
result_line = None
result = []
line = start
while line < end:
tokens = parser.get_tokens(line)
if _is_comment_line(tokens):
pass
elif _is_conditional_line(tokens):
conditions.append(parser.get_tokens(line))
elif _is_control_line(tokens):
assert block_type in (BlockType.comments, BlockType.control)
block_type = BlockType.control
controls.append(parser.get_tokens(line))
elif _is_statement_line(tokens):
assert block_type in (BlockType.comments,)
block_type = BlockType.statement
statement = parser.get_tokens(line)
request, last_line = FileBlockBase.__parse_request(
parser, line + 1, end
)
assert last_line == end
line = last_line
elif _is_query_line(tokens):
assert block_type in (BlockType.comments,)
block_type = BlockType.query
query = parser.get_tokens(line)
request, last_line = FileBlockBase.__parse_request(
parser, line + 1, end
)
result_line = last_line
line = last_line
if line == end:
break
tokens = parser.get_tokens(line)
assert _is_separation_line(tokens), f"last_line {last_line}, end {end}"
result, last_line = FileBlockBase.__parse_result(parser, line + 1, end)
assert last_line == end
line = last_line
line += 1
if block_type == BlockType.comments:
return FileBlockComments(parser, start, end)
if block_type == BlockType.control:
return FileBlockControl(parser, start, end, conditions, controls)
if block_type == BlockType.statement:
return FileBlockStatement(
parser, start, end, conditions, statement, request
)
if block_type == BlockType.query:
block = FileBlockQuery(
parser, start, end, conditions, query, request, result_line
)
block.with_result(result)
return block
def dump_to(self, output):
if output is None:
return
for line in range(self._start, self._end):
output.write(self._parser.get_line(line))
output.write("\n")
class FileBlockComments(FileBlockBase):
def __init__(self, parser, start, end):
super().__init__(parser, start, end)
def get_block_type(self):
return BlockType.comments
class FileBlockControl(FileBlockBase):
def __init__(self, parser, start, end, conditions, control):
super().__init__(parser, start, end)
self.conditions = conditions
self.control = control
def get_block_type(self):
return BlockType.control
def get_conditions(self):
return self.conditions
class FileBlockStatement(FileBlockBase):
def __init__(self, parser, start, end, conditions, statement, request):
super().__init__(parser, start, end)
self.conditions = conditions
self.statement = statement
self.request = request
def get_block_type(self):
return BlockType.statement
def get_request(self):
return self.request
def get_conditions(self):
return self.conditions
def get_statement(self):
return self.statement
def expected_error(self):
return self.statement[1] == "error"
class FileBlockQuery(FileBlockBase):
def __init__(self, parser, start, end, conditions, query, request, result_line):
super().__init__(parser, start, end)
self.conditions = conditions
self.query = query
self.request = request
self.result = None
self.result_line = result_line
def get_block_type(self):
return BlockType.query
def get_request(self):
return self.request
def get_conditions(self):
return self.conditions
def get_query(self):
return self.query
def expected_error(self):
return " ".join(self.query[2:]).lower() if self.query[1] == "error" else None
def get_types(self):
if self.query[1] == "error":
raise TestFileFormatException(
"the query is expected to fail, there are no types"
)
return self.query[1]
def get_sort_mode(self):
return self.query[2]
def get_result(self):
return self.result
def with_result(self, result):
self.result = result
def dump_to(self, output):
if output is None:
return
for line in range(self._start, self.result_line):
output.write(self._parser.get_line(line))
if self.result is not None:
logger.debug("dump result %s", self.result)
output.write("----\n")
for row in self.result:
output.write(" ".join(row) + "\n")
output.write("\n")
class TestFileParser:
CONTROL_TOKENS = ["halt", "hash-threshold"]
CONDITIONS_TOKENS = [CONDITION_SKIP, CONDITION_ONLY]
STATEMENT_TOKEN = "statement"
QUERY_TOKEN = "query"
COMMENT_TOKEN = "#"
DEFAULT_HASH_THRESHOLD = 8
def __init__(self, stream, test_name, test_file):
self._stream = stream
self._test_name = test_name
self._test_file = test_file
self._lines = []
self._raw_tokens = []
self._tokens = []
self._empty_lines = []
def get_test_name(self):
return self._test_name
def get_test_file(self):
if self._test_file is not None:
return self._test_file
return self._test_name
def get_line(self, line):
return self._lines[line]
def get_tokens(self, start, end=None):
if end is None:
return self._tokens[start]
else:
return self._tokens[start:end]
def get_tokens_from_lines(self, start, end):
return list(chain(*self._tokens[start:end]))
def __load_file(self):
self._lines = self._stream.readlines()
self._raw_tokens = [line.split() for line in self._lines]
assert len(self._lines) == len(self._raw_tokens)
self._tokens = []
for line in self._raw_tokens:
if self.COMMENT_TOKEN in line:
comment_starts_at = line.index(self.COMMENT_TOKEN)
self._tokens.append(line[0:comment_starts_at])
else:
self._tokens.append(line)
self._empty_lines = [i for i, x in enumerate(self._raw_tokens) if len(x) == 0]
logger.debug(
"Test file %s loaded rows %s, empty rows %s",
self.get_test_file(),
len(self._lines),
len(self._empty_lines),
)
def __unload_file(self):
self._test_file = None
self._test_name = None
self._stream = None
self._lines = []
self._raw_tokens = []
self._tokens = []
self._empty_lines = []
def _iterate_blocks(self):
prev = 0
for i in self._empty_lines:
if prev != i:
yield FileBlockBase.parse_block(self, prev, i)
prev = i + 1
if prev != len(self._lines):
yield FileBlockBase.parse_block(self, prev, len(self._lines))
def test_blocks(self):
try:
self.__load_file()
yield from self._iterate_blocks()
finally:
self.__unload_file()
class QueryResult:
def __init__(
self,
rows=None,
values_count=None,
data_hash=None,
exception=None,
hash_threshold=0,
):
self.rows = rows
self.values_count = values_count
self.data_hash = data_hash
self.exception = exception
self.hash_threshold = hash_threshold
self.hash_it()
logger.debug("created QueryResult %s", str(self))
def __str__(self):
params = ", ".join(
(
str(x)
for x in [
"rows: {}".format(self.rows) if self.rows else "",
"values_count: {}".format(self.values_count)
if self.values_count
else "",
"data_hash: {}".format(self.data_hash) if self.data_hash else "",
"exception: {}".format(self.exception) if self.exception else "",
"hash_threshold: {}".format(self.hash_threshold)
if self.hash_threshold
else "",
]
if x
)
)
return "QueryResult({})".format(params)
def __iter__(self):
if self.rows is not None:
if self.hash_threshold == 0:
return iter(self.rows)
if self.values_count <= self.hash_threshold:
return iter(self.rows)
if self.data_hash is not None:
return iter(
[["{} values hashing to {}".format(self.values_count, self.data_hash)]]
)
if self.exception is not None:
return iter([["exception: {}".format(self.exception)]])
raise ProgramError("Query result is empty", details="{}".format(self.__str__()))
@staticmethod
def __value_count(rows):
return reduce(lambda a, b: a + len(b), rows, 0)
@staticmethod
def parse_it(rows, hash_threshold):
logger.debug("parse result len: %s rows: %s", len(rows), rows)
if len(rows) == 1:
logger.debug("one row is %s", rows)
if len(rows[0]) > 0 and rows[0][0] == "exception:":
logging.debug("as exception")
message = " ".join(rows[0][1:])
return QueryResult(exception=message)
if len(rows[0]) == 5 and " ".join(rows[0][1:4]) == "values hashing to":
logging.debug("as hashed data")
values_count = int(rows[0][0])
data_hash = rows[0][4]
return QueryResult(data_hash=data_hash, values_count=values_count)
logger.debug("as data")
values_count = QueryResult.__value_count(rows)
return QueryResult(
rows=rows, values_count=values_count, hash_threshold=hash_threshold
)
@staticmethod
def __result_as_strings(rows, types):
res = []
for row in rows:
res_row = []
for c, t in zip(row, types):
if c is None:
res_row.append("NULL")
continue
if t == "T":
if c == "":
res_row.append("(empty)")
else:
res_row.append(str(c))
elif t == "I":
res_row.append(str(int(c)))
elif t == "R":
res_row.append(f"{c:.3f}")
res.append(res_row)
return res
@staticmethod
def __sort_result(rows, sort_mode):
if sort_mode == "nosort":
return rows
if sort_mode == "rowsort":
return sorted(rows)
if sort_mode == "valuesort":
values = list(chain(*rows))
values.sort()
return [values] if values else []
@staticmethod
def __calculate_hash(rows):
md5_hash = md5()
for row in rows:
for value in row:
md5_hash.update(value.encode("ascii"))
return str(md5_hash.hexdigest())
@staticmethod
def make_it(rows, types, sort_mode, hash_threshold):
values_count = QueryResult.__value_count(rows)
as_string = QueryResult.__result_as_strings(rows, types)
as_sorted = QueryResult.__sort_result(as_string, sort_mode)
return QueryResult(
rows=as_sorted, values_count=values_count, hash_threshold=hash_threshold
)
def hash_it(self):
if self.rows is not None and self.data_hash is None:
self.data_hash = QueryResult.__calculate_hash(self.rows)
return self
@staticmethod
def as_exception(e):
# do not print details to the test file
# but print original exception
if isinstance(e, ErrorWithParent):
message = "{}, original is: {}".format(e, e.get_parent())
else:
message = "{}".format(e)
return QueryResult(exception=message)
@staticmethod
def assert_eq(canonic, actual):
if not isinstance(canonic, QueryResult):
raise ProgramError("NotImplemented")
if not isinstance(actual, QueryResult):
raise ProgramError("NotImplemented")
if canonic.exception is not None or actual.exception is not None:
if canonic.exception is not None and actual.exception is not None:
if canonic.exception != actual.exception:
raise DataResultDiffer(
"canonic and actual results have different exceptions",
details=f"canonic: {canonic.exception}, actual: {actual.exception}",
)
else:
# exceptions are the same
return
elif canonic.exception is not None:
raise DataResultDiffer(
"canonic result has exception and actual result doesn't",
details=f"canonic: {canonic.exception}",
)
else:
raise DataResultDiffer(
"actual result has exception and canonic result doesn't",
details=f"actual: {actual.exception}",
)
canonic.hash_it()
actual.hash_it()
if canonic.data_hash is not None:
if actual.data_hash is None:
raise ProgramError("actual result has to have hash for data")
if canonic.values_count != actual.values_count:
raise DataResultDiffer(
"canonic and actual results have different value count",
details="canonic values count {}, actual {}".format(
canonic.values_count, actual.values_count
),
)
if canonic.data_hash != actual.data_hash:
raise DataResultDiffer(
"canonic and actual results have different hashes"
)
return
if canonic.rows is not None and actual.rows is not None:
if canonic.values_count != actual.values_count:
raise DataResultDiffer(
"canonic and actual results have different value count",
details="canonic values count {}, actual {}".format(
canonic.values_count, actual.values_count
),
)
if canonic.rows != actual.rows:
raise DataResultDiffer(
"canonic and actual results have different values"
)
return
raise ProgramError(
"Unable to compare results",
details="actual {}, canonic {}".format(actual, canonic),
)

View File

@ -0,0 +1,584 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import enum
import logging
import os
import traceback
import io
import json
import test_parser
from exceptions import (
Error,
ProgramError,
DataResultDiffer,
StatementExecutionError,
StatementSuccess,
QueryExecutionError,
QuerySuccess,
SchemeResultDiffer,
)
from connection import execute_request
logger = logging.getLogger("parser")
logger.setLevel(logging.DEBUG)
def _list_files(path):
logger.debug("list files in %s, type %s", path, type(path))
if not isinstance(path, str):
raise ProgramError("NotImplemented")
if os.path.isfile(path):
yield path
else:
with os.scandir(path) as it:
for entry in it:
yield from _list_files(entry.path)
def _filter_files(suffix, files):
yield from (path for path in files if path.endswith(suffix))
class RequestType(str, enum.Enum):
statement = enum.auto()
query = enum.auto()
class Status(str, enum.Enum):
success = "success"
error = "error"
class TestStatus:
def __init__(self):
self.status = None
self.file = None
self.position = None
self.request_type = None
self.request = None
self.reason = None
def get_map(self):
return {
"status": self.status.name.lower(),
# "file": self.file,
"position": self.position,
"request_type": self.request_type.name.lower(),
"request": self.request,
"reason": self.reason,
}
@staticmethod
def __from_error(err):
if isinstance(err, Error):
result = TestStatus()
result.name = err.test_name
result.file = err.test_file
result.position = err.test_pos
result.request = err.request
result.reason = err.reason
return result
raise ProgramError("NotImplemented")
@staticmethod
def from_exception(ex):
result = TestStatus.__from_error(ex)
if isinstance(ex, StatementSuccess):
result.status = Status.success
result.request_type = RequestType.statement
elif isinstance(ex, StatementExecutionError):
result.status = Status.error
result.request_type = RequestType.statement
elif isinstance(ex, QuerySuccess):
result.status = Status.success
result.request_type = RequestType.query
elif isinstance(ex, QueryExecutionError):
result.status = Status.error
result.request_type = RequestType.query
elif isinstance(ex, SchemeResultDiffer):
result.status = Status.error
result.request_type = RequestType.query
elif isinstance(ex, DataResultDiffer):
result.status = Status.error
result.request_type = RequestType.query
else:
raise ProgramError("NotImplemented", parent=ex)
return result
class SimpleStats:
def __init__(self, general=None):
self._general = general
self._success = 0
self._fail = 0
@property
def all(self):
return self._success + self.fail
@property
def success(self):
return self._success
@success.setter
def success(self, value):
if self._general is not None:
self._general.success += value - self._success
self._success = value
@property
def fail(self):
return self._fail
@fail.setter
def fail(self, value):
if self._general is not None:
self._general.fail += value - self._fail
self._fail = value
def __repr__(self):
return str(self.get_map())
def update(self, status):
if not isinstance(status, TestStatus):
raise ProgramError("NotImplemented")
if status.status == Status.error:
self.fail += 1
else:
self.success += 1
def get_map(self):
result = dict()
result["success"] = self.success
result["fail"] = self.fail
return result
def combine_with(self, right):
if not isinstance(right, SimpleStats):
raise ProgramError("NotImplemented")
self.success += right.success
self.fail += right.fail
class Stats:
def __init__(self):
self.total = SimpleStats()
self.statements = SimpleStats(self.total)
self.queries = SimpleStats(self.total)
def __repr__(self):
return str(self.get_map())
def update(self, status):
if not isinstance(status, TestStatus):
raise ProgramError("NotImplemented")
if status.request_type == RequestType.query:
choose = self.queries
else:
choose = self.statements
choose.update(status)
def get_map(self):
result = dict()
result["statements"] = self.statements.get_map()
result["queries"] = self.queries.get_map()
result["total"] = self.total.get_map()
return result
def combine_with(self, right):
if not isinstance(right, Stats):
raise ProgramError("NotImplemented")
self.statements.combine_with(right.statements)
self.queries.combine_with(right.queries)
class OneReport:
def __init__(self, test_name, test_file):
self.test_name = test_name
self.test_file = test_file
self.stats = Stats()
self.requests = dict() # type: dict(int, TestStatus)
def update(self, status):
if not isinstance(status, TestStatus):
raise ProgramError("NotImplemented")
self.stats.update(status)
self.requests[status.position] = status
def __repr__(self):
return str(self.get_map())
def get_map(self):
result = dict()
result["test_name"] = self.test_name
result["test_file"] = self.test_file
result["stats"] = self.stats.get_map()
result["requests"] = dict()
requests = result["requests"]
for pos, status in self.requests.items():
requests[pos] = status.get_map()
return result
class Report:
def __init__(self, dbms_name, input_dir=None):
self.dbms_name = dbms_name
self.stats = Stats()
self.tests = dict() # type: dict(str, OneReport)
self.input_dir = input_dir
self.output_dir = None
def update(self, status):
if not isinstance(status, TestStatus):
raise ProgramError("NotImplemented")
self.stats.update(status)
self.__get_file_report(status).update(status)
def __get_file_report(self, status):
if status.name not in self.tests:
self.tests[status.name] = OneReport(status.name, status.file)
return self.tests[status.name]
def __repr__(self):
return str(self.get_map())
def assign_result_dir(self, res_dir):
self.output_dir = res_dir
def get_map(self):
result = dict()
result["dbms_name"] = self.dbms_name
result["stats"] = self.stats.get_map()
result["input_dir"] = self.input_dir
if self.input_dir is not None:
result["input_dir"] = self.input_dir
if self.output_dir is not None:
result["output_dir"] = self.output_dir
result["tests"] = dict()
tests = result["tests"]
for test_name, one_report in self.tests.items():
tests.update({test_name: one_report.get_map()})
return result
def combine_with(self, right):
if not isinstance(right, Report):
raise ProgramError("NotImplemented")
if self.dbms_name != right.dbms_name:
raise ProgramError("reports are attached to the different databases")
if self.input_dir is None or right.input_dir is None:
raise ProgramError("can't compare input dirs")
if self.input_dir != right.input_dir:
raise ProgramError(
"can't combine reports, they are attached to the different input dirs"
)
for test_name in right.tests.keys():
if test_name in self.tests:
raise ProgramError(
f"can't combine reports, they have intersect tests, {test_name}"
)
self.tests.update(right.tests)
self.stats.combine_with(right.stats)
return self
def write_report(self, report_dir):
report_path = os.path.join(report_dir, "report.json")
logger.info(f"create file {report_path}")
with open(report_path, "w") as stream:
stream.write(json.dumps(self.get_map(), indent=4))
class TestRunner:
def __init__(
self,
connection,
verify_mode=None,
skip_request_types=None,
stop_at_statement_error=None,
):
self.connection = connection
self.verify = False if verify_mode is None else verify_mode
self.skip_request_types = []
if skip_request_types is not None:
for req_type in skip_request_types:
self.with_skip(req_type)
self.stop_at_statement_error = (
False if stop_at_statement_error is None else stop_at_statement_error
)
self.dbms_name = connection.DBMS_NAME
self.report = None
self.results = None
self._input_dir = None
def with_verify_mode(self):
self.verify = True
return self
def with_completion_mode(self):
self.verify = False
return self
def with_skip(self, type_request):
if type_request == RequestType.query:
self.skip_request_types.append(test_parser.BlockType.query)
if type_request == RequestType.statement:
self.skip_request_types.append(test_parser.BlockType.statement)
def __statuses(self, parser, out_stream):
skip_rest = False
for block in parser.test_blocks():
test_file = parser.get_test_file()
test_name = parser.get_test_name()
position = block.get_pos()
name_pos = f"{test_name}:{position}"
clogger = logging.getLogger(f"parser at {name_pos}")
if skip_rest:
clogger.debug("Skip rest blocks")
block.dump_to(out_stream)
continue
if block.get_block_type() == test_parser.BlockType.comments:
clogger.debug("Skip comment block")
block.dump_to(out_stream)
continue
if block.get_block_type() == test_parser.BlockType.control:
clogger.debug("Skip control block", name_pos)
block.dump_to(out_stream)
continue
clogger.debug("Request <%s>", block.get_request())
cond_lines = block.get_conditions()
if not test_parser.check_conditions(cond_lines, self.dbms_name):
clogger.debug("Conditionally skip block for %s", self.dbms_name)
block.dump_to(out_stream)
continue
request = block.get_request()
exec_res = execute_request(request, self.connection)
if block.get_block_type() in self.skip_request_types:
clogger.debug("Runtime skip block for %s", self.dbms_name)
block.dump_to(out_stream)
continue
if block.get_block_type() == test_parser.BlockType.statement:
try:
clogger.debug("this is statement")
if block.expected_error():
clogger.debug("error is expected")
if not exec_res.has_exception():
raise StatementExecutionError(
"statement request did not fail as expected"
)
else:
clogger.debug("ok is expected")
if exec_res.has_exception():
raise StatementExecutionError(
"statement failed with exception",
parent=exec_res.get_exception(),
)
raise StatementSuccess()
except StatementSuccess as ok:
clogger.debug("statement is ok")
ok.set_details(
file=test_file, name=test_name, pos=position, request=request
)
block.dump_to(out_stream)
yield TestStatus.from_exception(ok)
except StatementExecutionError as err:
err.set_details(
file=test_file, name=test_name, pos=position, request=request
)
clogger.critical("Unable to execute statement, %s", err.reason)
block.dump_to(out_stream)
if self.stop_at_statement_error:
clogger.critical("Will skip the rest of the file")
skip_rest = True
yield TestStatus.from_exception(err)
if block.get_block_type() == test_parser.BlockType.query:
try:
clogger.debug("this is query")
expected_error = block.expected_error()
if expected_error:
clogger.debug("error is expected %s", expected_error)
if exec_res.has_exception():
e = exec_res.get_exception()
clogger.debug("had error %s", e)
message = str(e).lower()
if expected_error not in message:
clogger.debug("errors differed")
raise QueryExecutionError(
"query is expected to fail with different error",
details=f"expected error: {expected_error}",
parent=exec_res.get_exception(),
)
else:
clogger.debug("errors matched")
raise QuerySuccess()
else:
clogger.debug("missed error")
raise QueryExecutionError(
"query is expected to fail with error",
details="expected error: {}".format(expected_error),
)
else:
clogger.debug("success is expected")
if exec_res.has_exception():
clogger.debug("had error")
if self.verify:
clogger.debug("verify mode")
canonic = test_parser.QueryResult.parse_it(
block.get_result(), 10
)
exception = QueryExecutionError(
"query execution failed with an exception",
parent=exec_res.get_exception(),
)
actual = test_parser.QueryResult.as_exception(exception)
test_parser.QueryResult.assert_eq(canonic, actual)
block.with_result(actual)
raise QuerySuccess()
else:
clogger.debug("completion mode")
raise QueryExecutionError(
"query execution failed with an exception",
parent=exec_res.get_exception(),
)
canonic_types = block.get_types()
clogger.debug("canonic types %s", canonic_types)
if len(exec_res.get_result()) > 0:
actual_columns_count = len(exec_res.get_result()[0])
canonic_columns_count = len(canonic_types)
if canonic_columns_count != actual_columns_count:
raise SchemeResultDiffer(
"canonic and actual columns count differ",
details="expected columns {}, actual columns {}".format(
canonic_columns_count, actual_columns_count
),
)
actual = test_parser.QueryResult.make_it(
exec_res.get_result(), canonic_types, block.get_sort_mode(), 10
)
if self.verify:
clogger.debug("verify mode")
canonic = test_parser.QueryResult.parse_it(
block.get_result(), 10
)
test_parser.QueryResult.assert_eq(canonic, actual)
block.with_result(actual)
raise QuerySuccess()
except QuerySuccess as ok:
ok.set_details(
file=test_file, name=test_name, pos=position, request=request
)
clogger.debug("query ok")
block.dump_to(out_stream)
yield TestStatus.from_exception(ok)
except Error as err:
err.set_details(
file=test_file, name=test_name, pos=position, request=request
)
clogger.warning(
"Query has failed with exception: %s",
err.reason,
)
block.with_result(test_parser.QueryResult.as_exception(err))
block.dump_to(out_stream)
yield TestStatus.from_exception(err)
def run_one_test(self, stream, test_name, test_file):
if self._input_dir is not None:
if not test_file.startswith(self._input_dir):
raise ProgramError(
f"that runner instance is attached to tests in dir {self._input_dir}"
f", can't run with file {test_file}"
)
else:
self._input_dir = os.path.dirname(test_file)
if self.report is None:
self.report = Report(self.dbms_name, self._input_dir)
if self.results is None:
self.results = dict()
with self.connection.with_one_test_scope():
out_stream = io.StringIO()
self.results[test_name] = out_stream
parser = test_parser.TestFileParser(stream, test_name, test_file)
for status in self.__statuses(parser, out_stream):
self.report.update(status)
def _assert_input_dir(self, input_dir):
if self._input_dir is not None:
if self._input_dir != input_dir:
raise ProgramError(
f"that runner instance is attached to tests in dir {self._input_dir}"
f", can't run with {input_dir}"
)
def run_all_tests_from_file(self, test_file, input_dir=None):
self._assert_input_dir(input_dir)
self._input_dir = input_dir
if self._input_dir is None:
self._input_dir = os.path.dirname(test_file)
test_name = os.path.relpath(test_file, start=self._input_dir)
logger.debug("open file %s", test_name)
with open(test_file, "r") as stream:
self.run_one_test(stream, test_name, test_file)
def run_all_tests_from_dir(self, input_dir):
self._assert_input_dir(input_dir)
self._input_dir = input_dir
for file_path in TestRunner.list_tests(self._input_dir):
self.run_all_tests_from_file(file_path, self._input_dir)
def write_results_to_dir(self, dir_path):
if not os.path.isdir(dir_path):
raise NotADirectoryError(dir_path)
self.report.assign_result_dir(dir_path)
for test_name, stream in self.results.items():
test_file = os.path.join(dir_path, test_name)
logger.info(f"create file {test_file}")
result_dir = os.path.dirname(test_file)
os.makedirs(result_dir, exist_ok=True)
with open(test_file, "w") as output:
output.write(stream.getvalue())
def write_report(self, report_dir):
self.report.write_report(report_dir)
@staticmethod
def list_tests(input_dir):
yield from _filter_files(".test", _list_files(input_dir))