Merge pull request #60408 from ClickHouse/update-style-python

Update style python
This commit is contained in:
Mikhail f. Shiryaev 2024-03-07 09:32:28 +01:00 committed by GitHub
commit 5e597228d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
51 changed files with 605 additions and 603 deletions

View File

@ -18,7 +18,8 @@ RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes \
python3-pip \
yamllint \
locales \
&& pip3 install black==23.1.0 boto3 codespell==2.2.1 mypy==1.3.0 PyGithub unidiff pylint==2.6.2 \
&& pip3 install black==23.12.0 boto3 codespell==2.2.1 mypy==1.8.0 PyGithub unidiff pylint==3.1.0 \
requests types-requests \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* /var/cache/debconf /tmp/* \
&& rm -rf /root/.cache/pip

View File

@ -1,6 +1,4 @@
# vim: ft=config
[BASIC]
[tool.pylint.BASIC]
max-module-lines=2000
# due to SQL
max-line-length=200
@ -9,11 +7,13 @@ max-branches=50
max-nested-blocks=10
max-statements=200
[FORMAT]
ignore-long-lines = (# )?<?https?://\S+>?$
[tool.pylint.FORMAT]
#ignore-long-lines = (# )?<?https?://\S+>?$
[MESSAGES CONTROL]
disable = missing-docstring,
[tool.pylint.'MESSAGES CONTROL']
# pytest.mark.parametrize is not callable (not-callable)
disable = '''
missing-docstring,
too-few-public-methods,
invalid-name,
too-many-arguments,
@ -26,18 +26,15 @@ disable = missing-docstring,
wildcard-import,
unused-wildcard-import,
singleton-comparison,
# pytest.mark.parametrize is not callable (not-callable)
not-callable,
# https://github.com/PyCQA/pylint/issues/3882
# [Python 3.9] Value 'Optional' is unsubscriptable (unsubscriptable-object) (also Union)
unsubscriptable-object,
# Drop them one day:
redefined-outer-name,
broad-except,
bare-except,
no-else-return,
global-statement
'''
[SIMILARITIES]
[tool.pylint.SIMILARITIES]
# due to SQL
min-similarity-lines=1000

View File

@ -14,4 +14,4 @@ warn_unused_ignores = False
warn_return_any = True
no_implicit_reexport = True
strict_equality = True
strict_concatenate = True
extra_checks = True

View File

@ -67,14 +67,14 @@ def main():
build_name = get_build_name_for_check(check_name)
urls = read_build_urls(build_name, reports_path)
if not urls:
raise Exception("No build URLs found")
raise ValueError("No build URLs found")
for url in urls:
if url.endswith("/clickhouse"):
build_url = url
break
else:
raise Exception("Cannot find the clickhouse binary among build results")
raise ValueError("Cannot find the clickhouse binary among build results")
logging.info("Got build url %s", build_url)

View File

@ -51,7 +51,7 @@ class Queue:
label: str
def get_scales(runner_type: str) -> Tuple[int, int]:
def get_scales() -> Tuple[int, int]:
"returns the multipliers for scaling down and up ASG by types"
# Scaling down is quicker on the lack of running jobs than scaling up on
# queue
@ -95,7 +95,7 @@ def set_capacity(
continue
raise ValueError("Queue status is not in ['in_progress', 'queued']")
scale_down, scale_up = get_scales(runner_type)
scale_down, scale_up = get_scales()
# With lyfecycle hooks some instances are actually free because some of
# them are in 'Terminating:Wait' state
effective_capacity = max(

View File

@ -1,37 +1,37 @@
#!/usr/bin/env python3
from pathlib import Path
import subprocess
import sys
from typing import List, Sequence, Tuple
import csv
import logging
import subprocess
import sys
from pathlib import Path
from typing import List, Sequence, Tuple
from report import (
ERROR,
FAILURE,
SKIPPED,
SUCCESS,
FAIL,
OK,
TestResult,
TestResults,
JobReport,
)
from env_helper import TEMP_PATH
from stopwatch import Stopwatch
from ci_config import JobNames
from ci_utils import normalize_string
from env_helper import TEMP_PATH
from functional_test_check import NO_CHANGES_MSG
from report import (
ERROR,
FAIL,
FAILURE,
OK,
SKIPPED,
SUCCESS,
JobReport,
TestResult,
TestResults,
)
from stopwatch import Stopwatch
def post_commit_status_from_file(file_path: Path) -> List[str]:
with open(file_path, "r", encoding="utf-8") as f:
res = list(csv.reader(f, delimiter="\t"))
if len(res) < 1:
raise Exception(f'Can\'t read from "{file_path}"')
raise IndexError(f'Can\'t read from "{file_path}"')
if len(res[0]) != 3:
raise Exception(f'Can\'t read from "{file_path}"')
raise IndexError(f'Can\'t read from "{file_path}"')
return res[0]

View File

@ -8,7 +8,10 @@ import time
from pathlib import Path
from typing import Any, Callable, List, Union
import requests # type: ignore
# isort: off
import requests
# isort: on
import get_robot_token as grt # we need an updated ROBOT_TOKEN
from ci_config import CI_CONFIG
@ -30,9 +33,10 @@ def get_with_retries(
"Getting URL with %i tries and sleep %i in between: %s", retries, sleep, url
)
exc = Exception("A placeholder to satisfy typing and avoid nesting")
timeout = kwargs.pop("timeout", 30)
for i in range(retries):
try:
response = requests.get(url, **kwargs)
response = requests.get(url, timeout=timeout, **kwargs)
response.raise_for_status()
return response
except Exception as e:
@ -74,10 +78,11 @@ def get_gh_api(
token_is_set = "Authorization" in kwargs.get("headers", {})
exc = Exception("A placeholder to satisfy typing and avoid nesting")
try_cnt = 0
timeout = kwargs.pop("timeout", 30)
while try_cnt < retries:
try_cnt += 1
try:
response = requests.get(url, **kwargs)
response = requests.get(url, timeout=timeout, **kwargs)
response.raise_for_status()
return response
except requests.HTTPError as e:
@ -85,7 +90,8 @@ def get_gh_api(
ratelimit_exceeded = (
e.response.status_code == 403
and b"rate limit exceeded"
in e.response._content # pylint:disable=protected-access
# pylint:disable-next=protected-access
in (e.response._content or b"")
)
try_auth = e.response.status_code == 404
if (ratelimit_exceeded or try_auth) and not token_is_set:

View File

@ -1,16 +1,15 @@
#!/usr/bin/env python3
import json
import re
import time
from base64 import b64decode
from collections import namedtuple
from queue import Queue
from threading import Thread
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional
import requests # type: ignore
from lambda_shared.pr import CATEGORY_TO_LABEL, check_pr_description
import requests
from lambda_shared.pr import check_pr_description
from lambda_shared.token import get_cached_access_token
NEED_RERUN_OR_CANCELL_WORKFLOWS = {
@ -48,16 +47,18 @@ class Worker(Thread):
def _exec_get_with_retry(url: str, token: str) -> dict:
headers = {"Authorization": f"token {token}"}
e = Exception()
for i in range(MAX_RETRY):
try:
response = requests.get(url, headers=headers)
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
return response.json() # type: ignore
except Exception as ex:
print("Got exception executing request", ex)
e = ex
time.sleep(i + 1)
raise Exception("Cannot execute GET request with retries")
raise requests.HTTPError("Cannot execute GET request with retries") from e
WorkflowDescription = namedtuple(
@ -215,16 +216,18 @@ def get_workflow_description(workflow_url: str, token: str) -> WorkflowDescripti
def _exec_post_with_retry(url: str, token: str, json: Optional[Any] = None) -> Any:
headers = {"Authorization": f"token {token}"}
e = Exception()
for i in range(MAX_RETRY):
try:
response = requests.post(url, headers=headers, json=json)
response = requests.post(url, headers=headers, json=json, timeout=30)
response.raise_for_status()
return response.json()
except Exception as ex:
print("Got exception executing request", ex)
e = ex
time.sleep(i + 1)
raise Exception("Cannot execute POST request with retry")
raise requests.HTTPError("Cannot execute POST request with retry") from e
def exec_workflow_url(urls_to_post, token):

View File

@ -456,11 +456,13 @@ class Backport:
tomorrow = date.today() + timedelta(days=1)
logging.info("Receive PRs suppose to be backported")
query_args = dict(
query=f"type:pr repo:{self._fetch_from} -label:{self.backport_created_label}",
label=",".join(self.labels_to_backport + [self.must_create_backport_label]),
merged=[since_date, tomorrow],
)
query_args = {
"query": f"type:pr repo:{self._fetch_from} -label:{self.backport_created_label}",
"label": ",".join(
self.labels_to_backport + [self.must_create_backport_label]
),
"merged": [since_date, tomorrow],
}
logging.info("Query to find the backport PRs:\n %s", query_args)
self.prs_for_backport = self.gh.get_pulls_from_search(**query_args)
logging.info(

View File

@ -397,7 +397,7 @@ class CiCache:
status.dump_to_file(record_file)
elif record_type == self.RecordType.PENDING:
assert isinstance(status, PendingState)
with open(record_file, "w") as json_file:
with open(record_file, "w", encoding="utf-8") as json_file:
json.dump(asdict(status), json_file)
else:
assert False
@ -1006,7 +1006,7 @@ def _mark_success_action(
def _print_results(result: Any, outfile: Optional[str], pretty: bool = False) -> None:
if outfile:
with open(outfile, "w") as f:
with open(outfile, "w", encoding="utf-8") as f:
if isinstance(result, str):
print(result, file=f)
elif isinstance(result, dict):
@ -1126,8 +1126,7 @@ def _configure_jobs(
jobs_to_wait: Dict[str, Dict[str, Any]] = {}
randomization_buckets = {} # type: Dict[str, Set[str]]
for job in digests:
digest = digests[job]
for job, digest in digests.items():
job_config = CI_CONFIG.get_job_config(job)
num_batches: int = job_config.num_batches
batches_to_do: List[int] = []
@ -1636,11 +1635,11 @@ def main() -> int:
indata: Optional[Dict[str, Any]] = None
if args.infile:
indata = (
json.loads(args.infile)
if not os.path.isfile(args.infile)
else json.load(open(args.infile))
)
if os.path.isfile(args.infile):
with open(args.infile, encoding="utf-8") as jfd:
indata = json.load(jfd)
else:
indata = json.loads(args.infile)
assert indata and isinstance(indata, dict), "Invalid --infile json"
result: Dict[str, Any] = {}

View File

@ -704,8 +704,7 @@ class CIConfig:
self.builds_report_config,
self.test_configs,
):
for check_name in config: # type: ignore
yield check_name
yield from config # type: ignore
def get_builds_for_report(
self, report_name: str, release: bool = False, backport: bool = False
@ -828,7 +827,6 @@ CI_CONFIG = CIConfig(
job
for job in JobNames
if not any(
[
nogo in job
for nogo in (
"asan",
@ -838,7 +836,6 @@ CI_CONFIG = CIConfig(
# skip build report jobs as not all builds will be done
"build check",
)
]
)
]
),

View File

@ -8,23 +8,14 @@ Lambda function to:
import argparse
import sys
from datetime import datetime
from typing import Dict, List
from typing import Dict
import requests # type: ignore
import boto3 # type: ignore
from botocore.exceptions import ClientError # type: ignore
from lambda_shared import (
RUNNER_TYPE_LABELS,
RunnerDescription,
RunnerDescriptions,
list_runners,
)
from lambda_shared import RUNNER_TYPE_LABELS, RunnerDescriptions, list_runners
from lambda_shared.token import (
get_access_token_by_key_app,
get_cached_access_token,
get_key_and_app_from_aws,
get_access_token_by_key_app,
)
UNIVERSAL_LABEL = "universal"
@ -162,7 +153,7 @@ if __name__ == "__main__":
if args.private_key:
private_key = args.private_key
elif args.private_key_path:
with open(args.private_key_path, "r") as key_file:
with open(args.private_key_path, "r", encoding="utf-8") as key_file:
private_key = key_file.read()
else:
print("Attempt to get key and id from AWS secret manager")

View File

@ -8,14 +8,13 @@ Lambda function to:
import argparse
import sys
from datetime import datetime
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List
import requests # type: ignore
import boto3 # type: ignore
import requests
from botocore.exceptions import ClientError # type: ignore
from lambda_shared import (
RUNNER_TYPE_LABELS,
RunnerDescription,
@ -23,9 +22,9 @@ from lambda_shared import (
list_runners,
)
from lambda_shared.token import (
get_access_token_by_key_app,
get_cached_access_token,
get_key_and_app_from_aws,
get_access_token_by_key_app,
)
UNIVERSAL_LABEL = "universal"
@ -140,6 +139,7 @@ def delete_runner(access_token: str, runner: RunnerDescription) -> bool:
response = requests.delete(
f"https://api.github.com/orgs/ClickHouse/actions/runners/{runner.id}",
headers=headers,
timeout=30,
)
response.raise_for_status()
print(f"Response code deleting {runner.name} is {response.status_code}")
@ -325,7 +325,7 @@ if __name__ == "__main__":
if args.private_key:
private_key = args.private_key
elif args.private_key_path:
with open(args.private_key_path, "r") as key_file:
with open(args.private_key_path, "r", encoding="utf-8") as key_file:
private_key = key_file.read()
else:
print("Attempt to get key and id from AWS secret manager")

View File

@ -1,13 +1,12 @@
#!/usr/bin/env python3
from pathlib import Path
from typing import Dict, List, Optional
import fileinput
import json
import logging
import time
from pathlib import Path
from typing import Dict, List, Optional
import requests # type: ignore
import requests
from get_robot_token import get_parameter_from_ssm
from pr_info import PRInfo
from report import TestResults
@ -72,11 +71,11 @@ class ClickHouseHelper:
if args:
url = args[0]
url = kwargs.get("url", url)
kwargs["timeout"] = kwargs.get("timeout", 100)
timeout = kwargs.pop("timeout", 100)
for i in range(5):
try:
response = requests.post(*args, **kwargs)
response = requests.post(*args, timeout=timeout, **kwargs)
except Exception as e:
error = f"Received exception while sending data to {url} on {i} attempt: {e}"
logging.warning(error)
@ -148,7 +147,9 @@ class ClickHouseHelper:
for i in range(5):
response = None
try:
response = requests.get(self.url, params=params, headers=self.auth)
response = requests.get(
self.url, params=params, headers=self.auth, timeout=100
)
response.raise_for_status()
return response.text
except Exception as ex:
@ -215,24 +216,24 @@ def prepare_tests_results_for_clickhouse(
head_ref = pr_info.head_ref
head_repo = pr_info.head_name
common_properties = dict(
pull_request_number=pr_info.number,
commit_sha=pr_info.sha,
commit_url=pr_info.commit_html_url,
check_name=check_name,
check_status=check_status,
check_duration_ms=int(float(check_duration) * 1000),
check_start_time=check_start_time,
report_url=report_url,
pull_request_url=pull_request_url,
base_ref=base_ref,
base_repo=base_repo,
head_ref=head_ref,
head_repo=head_repo,
task_url=pr_info.task_url,
instance_type=get_instance_type(),
instance_id=get_instance_id(),
)
common_properties = {
"pull_request_number": pr_info.number,
"commit_sha": pr_info.sha,
"commit_url": pr_info.commit_html_url,
"check_name": check_name,
"check_status": check_status,
"check_duration_ms": int(float(check_duration) * 1000),
"check_start_time": check_start_time,
"report_url": report_url,
"pull_request_url": pull_request_url,
"base_ref": base_ref,
"base_repo": base_repo,
"head_ref": head_ref,
"head_repo": head_repo,
"task_url": pr_info.task_url,
"instance_type": get_instance_type(),
"instance_id": get_instance_id(),
}
# Always publish a total record for all checks. For checks with individual
# tests, also publish a record per test.

View File

@ -303,7 +303,7 @@ def post_commit_status_to_file(
file_path: Path, description: str, state: str, report_url: str
) -> None:
if file_path.exists():
raise Exception(f'File "{file_path}" already exists!')
raise FileExistsError(f'File "{file_path}" already exists!')
with open(file_path, "w", encoding="utf-8") as f:
out = csv.writer(f, delimiter="\t")
out.writerow([state, report_url, description])
@ -329,7 +329,7 @@ class CommitStatusData:
@classmethod
def load_from_file(cls, file_path: Union[Path, str]): # type: ignore
res = {}
with open(file_path, "r") as json_file:
with open(file_path, "r", encoding="utf-8") as json_file:
res = json.load(json_file)
return CommitStatusData(**cls._filter_dict(res))
@ -347,7 +347,7 @@ class CommitStatusData:
def dump_to_file(self, file_path: Union[Path, str]) -> None:
file_path = Path(file_path) or STATUS_FILE_PATH
with open(file_path, "w") as json_file:
with open(file_path, "w", encoding="utf-8") as json_file:
json.dump(asdict(self), json_file)
def is_ok(self):

View File

@ -26,7 +26,7 @@ DOWNLOAD_RETRIES_COUNT = 5
def process_os_check(log_path: Path) -> TestResult:
name = log_path.name
with open(log_path, "r") as log:
with open(log_path, "r", encoding="utf-8") as log:
line = log.read().split("\n")[0].strip()
if line != "OK":
return TestResult(name, "FAIL")
@ -35,7 +35,7 @@ def process_os_check(log_path: Path) -> TestResult:
def process_glibc_check(log_path: Path, max_glibc_version: str) -> TestResults:
test_results = [] # type: TestResults
with open(log_path, "r") as log:
with open(log_path, "r", encoding="utf-8") as log:
for line in log:
if line.strip():
columns = line.strip().split(" ")
@ -204,7 +204,7 @@ def main():
elif "aarch64" in check_name:
max_glibc_version = "2.18" # because of build with newer sysroot?
else:
raise Exception("Can't determine max glibc version")
raise RuntimeError("Can't determine max glibc version")
state, description, test_results, additional_logs = process_result(
result_path,

View File

@ -195,18 +195,21 @@ def main():
ok_cnt = 0
status = SUCCESS # type: StatusType
image_tags = (
json.loads(args.image_tags)
if not os.path.isfile(args.image_tags)
else json.load(open(args.image_tags))
)
missing_images = (
image_tags
if args.missing_images == "all"
else json.loads(args.missing_images)
if not os.path.isfile(args.missing_images)
else json.load(open(args.missing_images))
)
if os.path.isfile(args.image_tags):
with open(args.image_tags, "r", encoding="utf-8") as jfd:
image_tags = json.load(jfd)
else:
image_tags = json.loads(args.image_tags)
if args.missing_images == "all":
missing_images = image_tags
elif os.path.isfile(args.missing_images):
with open(args.missing_images, "r", encoding="utf-8") as jfd:
missing_images = json.load(jfd)
else:
missing_images = json.loads(args.missing_images)
images_build_list = get_images_oredered_list()
for image in images_build_list:

View File

@ -135,18 +135,20 @@ def main():
archs = args.suffixes
assert len(archs) > 1, "arch suffix input param is invalid"
image_tags = (
json.loads(args.image_tags)
if not os.path.isfile(args.image_tags)
else json.load(open(args.image_tags))
)
missing_images = (
list(image_tags)
if args.missing_images == "all"
else json.loads(args.missing_images)
if not os.path.isfile(args.missing_images)
else json.load(open(args.missing_images))
)
if os.path.isfile(args.image_tags):
with open(args.image_tags, "r", encoding="utf-8") as jfd:
image_tags = json.load(jfd)
else:
image_tags = json.loads(args.image_tags)
if args.missing_images == "all":
missing_images = image_tags
elif os.path.isfile(args.missing_images):
with open(args.missing_images, "r", encoding="utf-8") as jfd:
missing_images = json.load(jfd)
else:
missing_images = json.loads(args.missing_images)
test_results = []
status = SUCCESS # type: StatusType

View File

@ -363,8 +363,8 @@ def main():
image = DockerImageData(image_path, image_repo, False)
args.release_type = auto_release_type(args.version, args.release_type)
tags = gen_tags(args.version, args.release_type)
repo_urls = dict()
direct_urls: Dict[str, List[str]] = dict()
repo_urls = {}
direct_urls: Dict[str, List[str]] = {}
release_or_pr, _ = get_release_or_pr(pr_info, args.version)
for arch, build_name in zip(ARCH, ("package_release", "package_aarch64")):

View File

@ -3,7 +3,7 @@
import os
import logging
import requests # type: ignore
import requests
from requests.adapters import HTTPAdapter # type: ignore
from urllib3.util.retry import Retry # type: ignore

View File

@ -12,10 +12,9 @@ from typing import List, Tuple
from build_download_helper import download_all_deb_packages
from clickhouse_helper import CiLogsCredentials
from docker_images_helper import DockerImage, pull_image, get_docker_image
from docker_images_helper import DockerImage, get_docker_image, pull_image
from download_release_packages import download_last_release
from env_helper import REPORT_PATH, TEMP_PATH, REPO_COPY
from env_helper import REPO_COPY, REPORT_PATH, TEMP_PATH
from pr_info import PRInfo
from report import ERROR, SUCCESS, JobReport, StatusType, TestResults, read_test_results
from stopwatch import Stopwatch
@ -54,8 +53,7 @@ def get_image_name(check_name: str) -> str:
return "clickhouse/stateless-test"
if "stateful" in check_name.lower():
return "clickhouse/stateful-test"
else:
raise Exception(f"Cannot deduce image name based on check name {check_name}")
raise ValueError(f"Cannot deduce image name based on check name {check_name}")
def get_run_command(

View File

@ -1,10 +1,10 @@
#!/usr/bin/env python3
import re
import logging
import re
from typing import List, Optional, Tuple
import requests # type: ignore
import requests
CLICKHOUSE_TAGS_URL = "https://api.github.com/repos/ClickHouse/ClickHouse/tags"
CLICKHOUSE_PACKAGE_URL = (
@ -82,13 +82,14 @@ def get_previous_release(server_version: Optional[Version]) -> Optional[ReleaseI
CLICKHOUSE_TAGS_URL, {"page": page, "per_page": 100}, timeout=10
)
if not response.ok:
raise Exception(
"Cannot load the list of tags from github: " + response.reason
logger.error(
"Cannot load the list of tags from github: %s", response.reason
)
response.raise_for_status()
releases_str = set(re.findall(VERSION_PATTERN, response.text))
if len(releases_str) == 0:
raise Exception(
raise ValueError(
"Cannot find previous release for "
+ str(server_version)
+ " server version"

View File

@ -9,7 +9,7 @@ from pathlib import Path
from typing import Any, List
import boto3 # type: ignore
import requests # type: ignore
import requests
from build_download_helper import (
download_build_with_progress,
get_build_name_for_check,
@ -46,7 +46,7 @@ FAILED_TESTS_ANCHOR = "# Failed tests"
def _parse_jepsen_output(path: Path) -> TestResults:
test_results = [] # type: TestResults
current_type = ""
with open(path, "r") as f:
with open(path, "r", encoding="utf-8") as f:
for line in f:
if SUCCESSFUL_TESTS_ANCHOR in line:
current_type = "OK"
@ -101,7 +101,7 @@ def prepare_autoscaling_group_and_get_hostnames(count):
instances = get_autoscaling_group_instances_ids(asg_client, JEPSEN_GROUP_NAME)
counter += 1
if counter > 30:
raise Exception("Cannot wait autoscaling group")
raise RuntimeError("Cannot wait autoscaling group")
ec2_client = boto3.client("ec2", region_name="us-east-1")
return get_instances_addresses(ec2_client, instances)
@ -119,12 +119,12 @@ def clear_autoscaling_group():
instances = get_autoscaling_group_instances_ids(asg_client, JEPSEN_GROUP_NAME)
counter += 1
if counter > 30:
raise Exception("Cannot wait autoscaling group")
raise RuntimeError("Cannot wait autoscaling group")
def save_nodes_to_file(instances: List[Any], temp_path: Path) -> Path:
nodes_path = temp_path / "nodes.txt"
with open(nodes_path, "w") as f:
with open(nodes_path, "w", encoding="utf-8") as f:
f.write("\n".join(instances))
f.flush()
return nodes_path
@ -159,7 +159,7 @@ def main():
)
args = parser.parse_args()
if args.program != "server" and args.program != "keeper":
if args.program not in ("server", "keeper"):
logging.warning("Invalid argument '%s'", args.program)
sys.exit(0)
@ -220,7 +220,7 @@ def main():
f"{S3_URL}/{S3_BUILDS_BUCKET}/{version}/{sha}/binary_release/clickhouse"
)
print(f"Clickhouse version: [{version_full}], sha: [{sha}], url: [{build_url}]")
head = requests.head(build_url)
head = requests.head(build_url, timeout=60)
assert head.status_code == 200, f"Clickhouse binary not found: {build_url}"
else:
build_name = get_build_name_for_check(check_name)

View File

@ -8,7 +8,7 @@ from collections import namedtuple
from typing import Any, Dict, Iterable, List, Optional
import boto3 # type: ignore
import requests # type: ignore
import requests
RUNNER_TYPE_LABELS = [
"builder",

View File

@ -6,7 +6,7 @@ from typing import Tuple
import boto3 # type: ignore
import jwt
import requests # type: ignore
import requests
from . import cached_value_is_valid

View File

@ -9,19 +9,13 @@ from pathlib import Path
from typing import List
from build_download_helper import download_fuzzers
from clickhouse_helper import (
CiLogsCredentials,
)
from docker_images_helper import DockerImage, pull_image, get_docker_image
from env_helper import REPORT_PATH, TEMP_PATH, REPO_COPY
from clickhouse_helper import CiLogsCredentials
from docker_images_helper import DockerImage, get_docker_image, pull_image
from env_helper import REPO_COPY, REPORT_PATH, TEMP_PATH
from pr_info import PRInfo
from stopwatch import Stopwatch
from tee_popen import TeePopen
NO_CHANGES_MSG = "Nothing to run"
@ -130,7 +124,8 @@ def main():
os.chmod(fuzzers_path / file, 0o777)
elif file.endswith("_seed_corpus.zip"):
corpus_path = fuzzers_path / (file.removesuffix("_seed_corpus.zip") + ".in")
zipfile.ZipFile(fuzzers_path / file, "r").extractall(corpus_path)
with zipfile.ZipFile(fuzzers_path / file, "r") as zfd:
zfd.extractall(corpus_path)
result_path = temp_path / "result_path"
result_path.mkdir(parents=True, exist_ok=True)

View File

@ -54,7 +54,7 @@ class Repo:
elif protocol == "origin":
self._url = protocol
else:
raise Exception(f"protocol must be in {self.VALID}")
raise ValueError(f"protocol must be in {self.VALID}")
def __str__(self):
return self._repo
@ -144,7 +144,7 @@ class Release:
for status in statuses:
if status["context"] == RELEASE_READY_STATUS:
if not status["state"] == SUCCESS:
raise Exception(
raise ValueError(
f"the status {RELEASE_READY_STATUS} is {status['state']}"
", not success"
)
@ -153,7 +153,7 @@ class Release:
page += 1
raise Exception(
raise KeyError(
f"the status {RELEASE_READY_STATUS} "
f"is not found for commit {self.release_commit}"
)
@ -188,7 +188,7 @@ class Release:
raise
if check_run_from_master and self._git.branch != "master":
raise Exception("the script must be launched only from master")
raise RuntimeError("the script must be launched only from master")
self.set_release_info()
@ -229,7 +229,7 @@ class Release:
def check_no_tags_after(self):
tags_after_commit = self.run(f"git tag --contains={self.release_commit}")
if tags_after_commit:
raise Exception(
raise RuntimeError(
f"Commit {self.release_commit} belongs to following tags:\n"
f"{tags_after_commit}\nChoose another commit"
)
@ -253,7 +253,7 @@ class Release:
)
output = self.run(f"git branch --contains={self.release_commit} {branch}")
if branch not in output:
raise Exception(
raise RuntimeError(
f"commit {self.release_commit} must belong to {branch} "
f"for {self.release_type} release"
)
@ -464,7 +464,7 @@ class Release:
logging.warning("Rolling back checked out %s for %s", ref, orig_ref)
self.run(f"git reset --hard; git checkout -f {orig_ref}")
raise
else:
# Normal flow when we need to checkout back
if with_checkout_back and need_rollback:
self.run(rollback_cmd)

View File

@ -22,8 +22,8 @@ from typing import (
from build_download_helper import get_gh_api
from ci_config import CI_CONFIG, BuildConfig
from env_helper import REPORT_PATH, TEMP_PATH
from ci_utils import normalize_string
from env_helper import REPORT_PATH, TEMP_PATH
logger = logging.getLogger(__name__)
@ -296,7 +296,7 @@ class JobReport:
def load(cls, from_file=None): # type: ignore
res = {}
from_file = from_file or JOB_REPORT_FILE
with open(from_file, "r") as json_file:
with open(from_file, "r", encoding="utf-8") as json_file:
res = json.load(json_file)
# Deserialize the nested lists of TestResult
test_results_data = res.get("test_results", [])
@ -316,7 +316,7 @@ class JobReport:
raise TypeError("Type not serializable")
to_file = to_file or JOB_REPORT_FILE
with open(to_file, "w") as json_file:
with open(to_file, "w", encoding="utf-8") as json_file:
json.dump(asdict(self), json_file, default=path_converter, indent=2)
@ -418,7 +418,7 @@ class BuildResult:
def load_from_file(cls, file: Union[Path, str]): # type: ignore
if not Path(file).exists():
return None
with open(file, "r") as json_file:
with open(file, "r", encoding="utf-8") as json_file:
res = json.load(json_file)
return BuildResult(**res)

View File

@ -4,9 +4,8 @@ import argparse
import sys
import boto3 # type: ignore
import requests # type: ignore
from lambda_shared.token import get_cached_access_token, get_access_token_by_key_app
import requests
from lambda_shared.token import get_access_token_by_key_app, get_cached_access_token
def get_runner_registration_token(access_token):
@ -17,6 +16,7 @@ def get_runner_registration_token(access_token):
response = requests.post(
"https://api.github.com/orgs/ClickHouse/actions/runners/registration-token",
headers=headers,
timeout=30,
)
response.raise_for_status()
data = response.json()
@ -43,6 +43,7 @@ def main(access_token, push_to_ssm, ssm_parameter_name):
def handler(event, context):
_, _ = event, context
main(get_cached_access_token(), True, "github_runner_registration_token")
@ -85,7 +86,7 @@ if __name__ == "__main__":
if args.private_key:
private_key = args.private_key
else:
with open(args.private_key_path, "r") as key_file:
with open(args.private_key_path, "r", encoding="utf-8") as key_file:
private_key = key_file.read()
token = get_access_token_by_key_app(private_key, args.app_id)

View File

@ -17,12 +17,12 @@ It's deployed to slack-bot-ci-lambda in CI/CD account
See also: https://aretestsgreenyet.com/
"""
import os
import json
import base64
import json
import os
import random
import requests # type: ignore
import requests
DRY_RUN_MARK = "<no url, dry run>"
@ -139,13 +139,11 @@ def get_play_url(query):
def run_clickhouse_query(query):
url = "https://play.clickhouse.com/?user=play&query=" + requests.utils.quote(query)
res = requests.get(url)
url = "https://play.clickhouse.com/?user=play&query=" + requests.compat.quote(query)
res = requests.get(url, timeout=30)
if res.status_code != 200:
print("Failed to execute query: ", res.status_code, res.content)
raise Exception(
"Failed to execute query: {}: {}".format(res.status_code, res.content)
)
res.raise_for_status()
lines = res.text.strip().splitlines()
return [x.split("\t") for x in lines]
@ -159,9 +157,9 @@ def split_broken_and_flaky_tests(failed_tests):
flaky_tests = []
for name, report, count_prev_str, count_str in failed_tests:
count_prev, count = int(count_prev_str), int(count_str)
if (2 <= count and count_prev < 2) or (count_prev == 1 and count == 1):
if (count_prev < 2 <= count) or (count_prev == count == 1):
# It failed 2 times or more within extended time window, it's definitely broken.
# 2 <= count_prev means that it was not reported as broken on previous runs
# 2 <= count means that it was not reported as broken on previous runs
broken_tests.append([name, report])
elif 0 < count and count_prev == 0:
# It failed only once, can be a rare flaky test
@ -172,19 +170,18 @@ def split_broken_and_flaky_tests(failed_tests):
def format_failed_tests_list(failed_tests, failure_type):
if len(failed_tests) == 1:
res = "There is a new {} test:\n".format(failure_type)
res = f"There is a new {failure_type} test:\n"
else:
res = "There are {} new {} tests:\n".format(len(failed_tests), failure_type)
res = f"There are {len(failed_tests)} new {failure_type} tests:\n"
for name, report in failed_tests[:MAX_TESTS_TO_REPORT]:
cidb_url = get_play_url(ALL_RECENT_FAILURES_QUERY.format(name))
res += "- *{}* - <{}|Report> - <{}|CI DB> \n".format(
name, report, cidb_url
)
res += f"- *{name}* - <{report}|Report> - <{cidb_url}|CI DB> \n"
if MAX_TESTS_TO_REPORT < len(failed_tests):
res += "- and {} other tests... :this-is-fine-fire:".format(
len(failed_tests) - MAX_TESTS_TO_REPORT
res += (
f"- and {len(failed_tests) - MAX_TESTS_TO_REPORT} other "
"tests... :this-is-fine-fire:"
)
return res
@ -223,19 +220,16 @@ def get_too_many_failures_message_impl(failures_count):
if random.random() < REPORT_NO_FAILURES_PROBABILITY:
return None
return "Wow, there are *no failures* at all... 0_o"
if curr_failures < MAX_FAILURES:
return_none = (
curr_failures < MAX_FAILURES
or curr_failures < prev_failures
or (curr_failures - prev_failures) / prev_failures < 0.2
)
if return_none:
return None
if prev_failures < MAX_FAILURES:
return ":alert: *CI is broken: there are {} failures during the last 24 hours*".format(
curr_failures
)
if curr_failures < prev_failures:
return None
if (curr_failures - prev_failures) / prev_failures < 0.2:
return None
return "CI is broken and it's getting worse: there are {} failures during the last 24 hours".format(
curr_failures
)
return f":alert: *CI is broken: there are {curr_failures} failures during the last 24 hours*"
return "CI is broken and it's getting worse: there are {curr_failures} failures during the last 24 hours"
def get_too_many_failures_message(failures_count):
@ -254,7 +248,7 @@ def get_failed_checks_percentage_message(percentage):
return None
msg = ":alert: " if p > 1 else "Only " if p < 0.5 else ""
msg += "*{0:.2f}%* of all checks in master have failed yesterday".format(p)
msg += f"*{p:.2f}%* of all checks in master have failed yesterday"
return msg
@ -280,14 +274,10 @@ def send_to_slack_impl(message):
payload = SLACK_MESSAGE_JSON.copy()
payload["text"] = message
res = requests.post(SLACK_URL, json.dumps(payload))
res = requests.post(SLACK_URL, json.dumps(payload), timeout=30)
if res.status_code != 200:
print("Failed to send a message to Slack: ", res.status_code, res.content)
raise Exception(
"Failed to send a message to Slack: {}: {}".format(
res.status_code, res.content
)
)
res.raise_for_status()
def send_to_slack(message):
@ -303,7 +293,7 @@ def query_and_alert_if_needed(query, get_message_func):
if msg is None:
return
msg += "\nCI DB query: <{}|link>".format(get_play_url(query))
msg += f"\nCI DB query: <{get_play_url(query)}|link>"
print("Sending message to slack:", msg)
send_to_slack(msg)
@ -317,6 +307,7 @@ def check_and_alert():
def handler(event, context):
_, _ = event, context
try:
check_and_alert()
return {"statusCode": 200, "body": "OK"}

View File

@ -46,14 +46,14 @@ def main():
build_name = get_build_name_for_check(check_name)
urls = read_build_urls(build_name, reports_path)
if not urls:
raise Exception("No build URLs found")
raise ValueError("No build URLs found")
for url in urls:
if url.endswith("/clickhouse"):
build_url = url
break
else:
raise Exception("Cannot find binary clickhouse among build results")
raise ValueError("Cannot find the clickhouse binary among build results")
logging.info("Got build url %s", build_url)

View File

@ -53,14 +53,14 @@ def main():
print(build_name)
urls = read_build_urls(build_name, reports_path)
if not urls:
raise Exception("No build URLs found")
raise ValueError("No build URLs found")
for url in urls:
if url.endswith("/clickhouse"):
build_url = url
break
else:
raise Exception("Cannot find the clickhouse binary among build results")
raise ValueError("Cannot find the clickhouse binary among build results")
logging.info("Got build url %s", build_url)

View File

@ -1,11 +1,11 @@
#!/usr/bin/env python3
import shutil
import logging
import os
import shutil
import signal
import subprocess
import tempfile
import logging
import signal
class SSHAgent:
@ -21,7 +21,7 @@ class SSHAgent:
def start(self):
if shutil.which("ssh-agent") is None:
raise Exception("ssh-agent binary is not available")
raise RuntimeError("ssh-agent binary is not available")
self._env_backup["SSH_AUTH_SOCK"] = os.environ.get("SSH_AUTH_SOCK")
self._env_backup["SSH_OPTIONS"] = os.environ.get("SSH_OPTIONS")
@ -54,7 +54,7 @@ class SSHAgent:
def remove(self, key_pub):
if key_pub not in self._keys:
raise Exception(f"Private key not found, public part: {key_pub}")
raise ValueError(f"Private key not found, public part: {key_pub}")
if self._keys[key_pub] > 1:
self._keys[key_pub] -= 1
@ -107,7 +107,7 @@ class SSHAgent:
if p.returncode:
message = stderr.strip() + b"\n" + stdout.strip()
raise Exception(message.strip().decode())
raise RuntimeError(message.strip().decode())
return stdout
@ -115,9 +115,9 @@ class SSHAgent:
class SSHKey:
def __init__(self, key_name=None, key_value=None):
if key_name is None and key_value is None:
raise Exception("Either key_name or key_value must be specified")
raise ValueError("Either key_name or key_value must be specified")
if key_name is not None and key_value is not None:
raise Exception("key_name or key_value must be specified")
raise ValueError("key_name or key_value must be specified")
if key_name is not None:
self.key = os.getenv(key_name)
else:

View File

@ -1,14 +1,14 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""This script is used in docker images for stress tests and upgrade tests"""
from multiprocessing import cpu_count
from pathlib import Path
from subprocess import Popen, call, check_output, STDOUT, PIPE
from typing import List
import argparse
import logging
import random
import time
from multiprocessing import cpu_count
from pathlib import Path
from subprocess import PIPE, STDOUT, Popen, call, check_output
from typing import List
def get_options(i: int, upgrade_check: bool) -> str:
@ -90,12 +90,13 @@ def run_func_test(
]
pipes = []
for i, path in enumerate(output_paths):
with open(path, "w") as op:
with open(path, "w", encoding="utf-8") as op:
full_command = (
f"{cmd} {get_options(i, upgrade_check)} {global_time_limit_option} "
f"{skip_tests_option} {upgrade_check_option}"
)
logging.info("Run func tests '%s'", full_command)
# pylint:disable-next=consider-using-with
pipes.append(Popen(full_command, shell=True, stdout=op, stderr=op))
time.sleep(0.5)
return pipes
@ -204,6 +205,7 @@ def prepare_for_hung_check(drop_databases: bool) -> bool:
continue
command = make_query_command(f"DETACH DATABASE {db}")
# we don't wait for drop
# pylint:disable-next=consider-using-with
Popen(command, shell=True)
break
except Exception as ex:
@ -212,7 +214,7 @@ def prepare_for_hung_check(drop_databases: bool) -> bool:
)
time.sleep(i)
else:
raise Exception(
raise RuntimeError(
"Cannot drop databases after stress tests. Probably server consumed "
"too much memory and cannot execute simple queries"
)
@ -293,7 +295,9 @@ def main():
args = parse_args()
if args.drop_databases and not args.hung_check:
raise Exception("--drop-databases only used in hung check (--hung-check)")
raise argparse.ArgumentTypeError(
"--drop-databases only used in hung check (--hung-check)"
)
# FIXME Hung check with ubsan is temporarily disabled due to
# https://github.com/ClickHouse/ClickHouse/issues/45372
@ -359,7 +363,7 @@ def main():
]
)
hung_check_log = args.output_folder / "hung_check.log" # type: Path
tee = Popen(["/usr/bin/tee", hung_check_log], stdin=PIPE)
with Popen(["/usr/bin/tee", hung_check_log], stdin=PIPE) as tee:
res = call(cmd, shell=True, stdout=tee.stdin, stderr=STDOUT, timeout=600)
if tee.stdin is not None:
tee.stdin.close()
@ -367,7 +371,9 @@ def main():
logging.info("Hung check failed with exit code %d", res)
else:
hung_check_status = "No queries hung\tOK\t\\N\t\n"
with open(args.output_folder / "test_results.tsv", "w+") as results:
with open(
args.output_folder / "test_results.tsv", "w+", encoding="utf-8"
) as results:
results.write(hung_check_status)
hung_check_log.unlink()

View File

@ -95,7 +95,7 @@ def process_results(
results_path = result_directory / "test_results.tsv"
test_results = read_test_results(results_path, True)
if len(test_results) == 0:
raise Exception("Empty results")
raise ValueError("Empty results")
except Exception as e:
return (
ERROR,

View File

@ -43,7 +43,7 @@ def process_result(
results_path = result_directory / "test_results.tsv"
test_results = read_test_results(results_path)
if len(test_results) == 0:
raise Exception("Empty results")
raise ValueError("Empty results")
return state, description, test_results, additional_files
except Exception:

View File

@ -2,13 +2,12 @@
import argparse
import json
from datetime import datetime
from queue import Queue
from threading import Thread
import requests # type: ignore
import boto3 # type: ignore
import requests
class Keys(set):
@ -34,7 +33,7 @@ class Worker(Thread):
m = self.queue.get()
if m == "":
break
response = requests.get(f"https://github.com/{m}.keys")
response = requests.get(f"https://github.com/{m}.keys", timeout=30)
self.results.add(f"# {m}\n{response.text}\n")
self.queue.task_done()
@ -45,7 +44,9 @@ def get_org_team_members(token: str, org: str, team_slug: str) -> set:
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(
f"https://api.github.com/orgs/{org}/teams/{team_slug}/members", headers=headers
f"https://api.github.com/orgs/{org}/teams/{team_slug}/members",
headers=headers,
timeout=30,
)
response.raise_for_status()
data = response.json()

View File

@ -1,14 +1,14 @@
#!/usr/bin/env python3
from io import TextIOWrapper
from pathlib import Path
from subprocess import Popen, PIPE, STDOUT
from threading import Thread
from time import sleep
from typing import Optional, Union
import logging
import os
import sys
from io import TextIOWrapper
from pathlib import Path
from subprocess import PIPE, STDOUT, Popen
from threading import Thread
from time import sleep
from typing import Optional, Union
# Very simple tee logic implementation. You can specify a shell command, output
@ -98,5 +98,6 @@ class TeePopen:
@property
def log_file(self) -> TextIOWrapper:
if self._log_file is None:
# pylint:disable-next=consider-using-with
self._log_file = open(self._log_file_name, "w", encoding="utf-8")
return self._log_file

View File

@ -8,8 +8,7 @@ from dataclasses import dataclass
from typing import Any, Dict, List
import boto3 # type: ignore
from lambda_shared import RunnerDescriptions, list_runners, cached_value_is_valid
from lambda_shared import RunnerDescriptions, cached_value_is_valid, list_runners
from lambda_shared.token import get_access_token_by_key_app, get_cached_access_token
@ -134,7 +133,7 @@ def main(access_token: str, event: dict) -> Dict[str, List[str]]:
candidates = instances_by_zone[zone]
total_to_kill += num_to_kill
if num_to_kill > len(candidates):
raise Exception(
raise RuntimeError(
f"Required to kill {num_to_kill}, but have only {len(candidates)}"
f" candidates in AV {zone}"
)
@ -196,6 +195,7 @@ def main(access_token: str, event: dict) -> Dict[str, List[str]]:
def handler(event: dict, context: Any) -> Dict[str, List[str]]:
_ = context
return main(get_cached_access_token(), event)
@ -226,7 +226,7 @@ if __name__ == "__main__":
if args.private_key:
private_key = args.private_key
else:
with open(args.private_key_path, "r") as key_file:
with open(args.private_key_path, "r", encoding="utf-8") as key_file:
private_key = key_file.read()
token = get_access_token_by_key_app(private_key, args.app_id)

View File

@ -23,7 +23,7 @@ def get_test_name(line):
for element in elements:
if "(" not in element and ")" not in element:
return element
raise Exception(f"No test name in line '{line}'")
raise ValueError(f"No test name in line '{line}'")
def process_results(

View File

@ -6,7 +6,7 @@ import time
from collections import namedtuple
from urllib.parse import quote
import requests # type: ignore
import requests
from lambda_shared.pr import TRUSTED_CONTRIBUTORS
from lambda_shared.token import get_cached_access_token
@ -90,26 +90,29 @@ def is_trusted_contributor(pr_user_login, pr_user_orgs):
def _exec_get_with_retry(url, token):
headers = {"Authorization": f"token {token}"}
e = Exception()
for i in range(MAX_RETRY):
try:
response = requests.get(url, headers=headers)
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
except Exception as ex:
print("Got exception executing request", ex)
e = ex
time.sleep(i + 1)
raise Exception("Cannot execute GET request with retries")
raise requests.HTTPError("Cannot execute GET request with retries") from e
def _exec_post_with_retry(url, token, data=None):
headers = {"Authorization": f"token {token}"}
e = Exception()
for i in range(MAX_RETRY):
try:
if data:
response = requests.post(url, headers=headers, json=data)
response = requests.post(url, headers=headers, json=data, timeout=30)
else:
response = requests.post(url, headers=headers)
response = requests.post(url, headers=headers, timeout=30)
if response.status_code == 403:
data = response.json()
if (
@ -123,9 +126,10 @@ def _exec_post_with_retry(url, token, data=None):
return response.json()
except Exception as ex:
print("Got exception executing request", ex)
e = ex
time.sleep(i + 1)
raise Exception("Cannot execute POST request with retry")
raise requests.HTTPError("Cannot execute POST request with retry") from e
def _get_pull_requests_from(repo_url, owner, branch, token):

View File

@ -5,48 +5,46 @@
# pylint: disable=too-many-lines
# pylint: disable=anomalous-backslash-in-string
import enum
from queue import Full
import shutil
import itertools
import sys
import os
import os.path
import glob
import platform
import signal
import re
import copy
import traceback
import math
import enum
import glob
# Not requests, to avoid requiring extra dependency.
import http.client
import urllib.parse
import itertools
import json
import math
import multiprocessing
import os
import os.path
import platform
import random
import re
import shutil
import signal
import socket
import string
import subprocess
import sys
import traceback
import urllib.parse
# for crc32
import zlib
from argparse import ArgumentParser
from typing import Tuple, Union, Optional, Dict, Set, List
import subprocess
from subprocess import Popen
from subprocess import PIPE
from contextlib import closing
from datetime import datetime, timedelta
from time import time, sleep
from errno import ESRCH
from queue import Full
from subprocess import PIPE, Popen
from time import sleep, time
from typing import Dict, List, Optional, Set, Tuple, Union
try:
import termcolor # type: ignore
except ImportError:
termcolor = None
import random
import string
import multiprocessing
import socket
from contextlib import closing
USE_JINJA = True
try:
@ -70,7 +68,7 @@ TEST_FILE_EXTENSIONS = [".sql", ".sql.j2", ".sh", ".py", ".expect"]
VERSION_PATTERN = r"^((\d+\.)?(\d+\.)?(\d+\.)?\d+)$"
def stringhash(s):
def stringhash(s: str) -> int:
# default hash() function consistent
# only during process invocation https://stackoverflow.com/a/42089311
return zlib.crc32(s.encode("utf-8"))
@ -94,6 +92,10 @@ def trim_for_log(s):
return "\n".join(lines)
class TestException(Exception):
pass
class HTTPError(Exception):
def __init__(self, message=None, code=None):
self.message = message
@ -250,7 +252,7 @@ def get_db_engine(args, database_name):
def get_create_database_settings(args, testcase_args):
create_database_settings = dict()
create_database_settings = {}
if testcase_args:
create_database_settings["log_comment"] = testcase_args.testcase_basename
if args.db_engine == "Ordinary":
@ -1186,7 +1188,7 @@ class TestCase:
)
if result_is_different:
diff_proc = Popen(
with Popen(
[
"diff",
"-U",
@ -1197,23 +1199,23 @@ class TestCase:
encoding="latin-1",
stdout=PIPE,
universal_newlines=True,
)
) as diff_proc:
if self.show_whitespaces_in_diff:
sed_proc = Popen(
with Popen(
["sed", "-e", "s/[ \t]\\+$/&$/g"],
stdin=diff_proc.stdout,
stdout=PIPE,
) as sed_proc:
diff = sed_proc.communicate()[0].decode(
"utf-8", errors="ignore"
)
diff_proc.stdout.close() # Allow diff to receive a SIGPIPE if cat exits.
diff = sed_proc.communicate()[0].decode("utf-8", errors="ignore")
else:
diff = diff_proc.communicate()[0]
if diff.startswith("Binary files "):
diff += "Content of stdout:\n===================\n"
file = open(self.stdout_file, "rb")
with open(self.stdout_file, "rb") as file:
diff += str(file.read())
file.close()
diff += "==================="
description += f"\n{diff}\n"
if debug_log:
@ -1376,6 +1378,7 @@ class TestCase:
command = pattern.format(**params)
# pylint:disable-next=consider-using-with; TODO: fix
proc = Popen(command, shell=True, env=os.environ, start_new_session=True)
while (
@ -1542,7 +1545,7 @@ class TestCase:
)
if len(leftover_tables) != 0:
raise Exception(
raise TestException(
f"The test should cleanup its tables ({leftover_tables}), otherwise it is inconvenient for running it locally."
)
@ -1625,7 +1628,7 @@ class TestSuite:
):
return "#"
else:
raise Exception(f"Unknown file_extension: {filename}")
raise TestException(f"Unknown file_extension: {filename}")
def parse_tags_from_line(line, comment_sign) -> Set[str]:
if not line.startswith(comment_sign):
@ -1686,18 +1689,23 @@ class TestSuite:
self.suite_tmp_path: str = suite_tmp_path
self.suite: str = suite
filter_func = lambda x: True # noqa: ignore E731
if args.run_by_hash_num is not None and args.run_by_hash_total is not None:
if args.run_by_hash_num > args.run_by_hash_total:
raise Exception(
raise TestException(
f"Incorrect run by hash, value {args.run_by_hash_num} bigger than total {args.run_by_hash_total}"
)
filter_func = (
lambda x: stringhash(x) % args.run_by_hash_total == args.run_by_hash_num
def filter_func(x: str) -> bool:
return bool(
stringhash(x) % args.run_by_hash_total == args.run_by_hash_num
)
else:
def filter_func(x: str) -> bool:
_ = x
return True
self.all_tests: List[str] = self.get_tests_list(
self.tests_in_suite_key_func, filter_func
)
@ -2418,7 +2426,7 @@ def main(args):
pid = get_server_pid()
print("Got server pid", pid)
print_stacktraces()
raise Exception(msg)
raise TestException(msg)
args.build_flags = collect_build_flags(args)
args.changed_merge_tree_settings = collect_changed_merge_tree_settings(args)
@ -2622,7 +2630,7 @@ def find_binary(name):
if os.access(bin_path, os.X_OK):
return bin_path
raise Exception(f"{name} was not found in PATH")
raise TestException(f"{name} was not found in PATH")
def find_clickhouse_command(binary, command):

View File

@ -11,9 +11,11 @@ import shlex
import shutil
import string
import subprocess
import sys
import time
import zlib # for crc32
from collections import defaultdict
from itertools import chain
from integration_test_images import IMAGES
@ -102,7 +104,7 @@ def get_counters(fname):
"SKIPPED": set([]),
}
with open(fname, "r") as out:
with open(fname, "r", encoding="utf-8") as out:
for line in out:
line = line.strip()
# Example of log:
@ -118,7 +120,7 @@ def get_counters(fname):
# [gw0] [ 7%] ERROR test_mysql_protocol/test.py::test_golang_client
# ^^^^^^^^^^^^^
if line.strip().startswith("["):
line = re.sub("^\[[^\[\]]*\] \[[^\[\]]*\] ", "", line)
line = re.sub(r"^\[[^\[\]]*\] \[[^\[\]]*\] ", "", line)
line_arr = line.split(" ")
if len(line_arr) < 2:
@ -160,7 +162,7 @@ def get_counters(fname):
def parse_test_times(fname):
read = False
description_output = []
with open(fname, "r") as out:
with open(fname, "r", encoding="utf-8") as out:
for line in out:
if read and "==" in line:
break
@ -196,7 +198,7 @@ def clear_ip_tables_and_restart_daemons():
shell=True,
)
except subprocess.CalledProcessError as err:
logging.info("docker kill excepted: " + str(err))
logging.info("docker kill excepted: %s", str(err))
try:
logging.info("Removing all docker containers")
@ -205,7 +207,7 @@ def clear_ip_tables_and_restart_daemons():
shell=True,
)
except subprocess.CalledProcessError as err:
logging.info("docker rm excepted: " + str(err))
logging.info("docker rm excepted: %s", str(err))
# don't restart docker if it's disabled
if os.environ.get("CLICKHOUSE_TESTS_RUNNER_RESTART_DOCKER", "1") == "1":
@ -213,7 +215,7 @@ def clear_ip_tables_and_restart_daemons():
logging.info("Stopping docker daemon")
subprocess.check_output("service docker stop", shell=True)
except subprocess.CalledProcessError as err:
logging.info("docker stop excepted: " + str(err))
logging.info("docker stop excepted: %s", str(err))
try:
for i in range(200):
@ -226,9 +228,9 @@ def clear_ip_tables_and_restart_daemons():
time.sleep(0.5)
logging.info("Waiting docker to start, current %s", str(err))
else:
raise Exception("Docker daemon doesn't responding")
raise RuntimeError("Docker daemon doesn't responding")
except subprocess.CalledProcessError as err:
logging.info("Can't reload docker: " + str(err))
logging.info("Can't reload docker: %s", str(err))
iptables_iter = 0
try:
@ -276,13 +278,14 @@ class ClickhouseIntegrationTestsRunner:
def base_path(self):
return os.path.join(str(self.result_path), "../")
def should_skip_tests(self):
@staticmethod
def should_skip_tests():
return []
def get_image_with_version(self, name):
if name in self.image_versions:
return name + ":" + self.image_versions[name]
logging.warn(
logging.warning(
"Cannot find image %s in params list %s", name, self.image_versions
)
if ":" not in name:
@ -292,7 +295,7 @@ class ClickhouseIntegrationTestsRunner:
def get_image_version(self, name: str):
if name in self.image_versions:
return self.image_versions[name]
logging.warn(
logging.warning(
"Cannot find image %s in params list %s", name, self.image_versions
)
return "latest"
@ -304,13 +307,9 @@ class ClickhouseIntegrationTestsRunner:
image_cmd = self._get_runner_image_cmd(repo_path)
cmd = (
"cd {repo_path}/tests/integration && "
"timeout --signal=KILL 1h ./runner {runner_opts} {image_cmd} --pre-pull --command '{command}' ".format(
repo_path=repo_path,
runner_opts=self._get_runner_opts(),
image_cmd=image_cmd,
command=r""" echo Pre Pull finished """,
)
f"cd {repo_path}/tests/integration && "
f"timeout --signal=KILL 1h ./runner {self._get_runner_opts()} {image_cmd} "
"--pre-pull --command ' echo Pre Pull finished ' "
)
for i in range(5):
@ -322,14 +321,15 @@ class ClickhouseIntegrationTestsRunner:
)
return
except subprocess.CalledProcessError as err:
logging.info("docker-compose pull failed: " + str(err))
logging.info("docker-compose pull failed: %s", str(err))
continue
logging.error("Pulling images failed for 5 attempts. Will fail the worker.")
# We pass specific retcode to to ci/integration_test_check.py to skip status reporting and restart job
exit(13)
sys.exit(13)
def _can_run_with(self, path, opt):
with open(path, "r") as script:
@staticmethod
def _can_run_with(path, opt):
with open(path, "r", encoding="utf-8") as script:
for line in script:
if opt in line:
return True
@ -349,19 +349,23 @@ class ClickhouseIntegrationTestsRunner:
logging.info("Package found in %s", full_path)
log_name = "install_" + f + ".log"
log_path = os.path.join(str(self.path()), log_name)
with open(log_path, "w") as log:
cmd = "dpkg -x {} .".format(full_path)
with open(log_path, "w", encoding="utf-8") as log:
cmd = f"dpkg -x {full_path} ."
logging.info("Executing installation cmd %s", cmd)
retcode = subprocess.Popen(
with subprocess.Popen(
cmd, shell=True, stderr=log, stdout=log
).wait()
if retcode == 0:
logging.info("Installation of %s successfull", full_path)
) as proc:
if proc.wait() == 0:
logging.info(
"Installation of %s successfull", full_path
)
else:
raise Exception("Installation of %s failed", full_path)
raise RuntimeError(
f"Installation of {full_path} failed"
)
break
else:
raise Exception("Package with {} not found".format(package))
raise FileNotFoundError(f"Package with {package} not found")
# logging.info("Unstripping binary")
# logging.info(
# "Unstring %s",
@ -387,11 +391,11 @@ class ClickhouseIntegrationTestsRunner:
os.getenv("CLICKHOUSE_TESTS_LIBRARY_BRIDGE_BIN_PATH"),
)
def _compress_logs(self, dir, relpaths, result_path):
@staticmethod
def _compress_logs(directory, relpaths, result_path):
retcode = subprocess.call( # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL
"tar --use-compress-program='zstd --threads=0' -cf {} -C {} {}".format(
result_path, dir, " ".join(relpaths)
),
f"tar --use-compress-program='zstd --threads=0' -cf {result_path} -C "
f"{directory} {' '.join(relpaths)}",
shell=True,
)
# tar return 1 when the files are changed on compressing, we ignore it
@ -443,26 +447,25 @@ class ClickhouseIntegrationTestsRunner:
return list(sorted(all_tests))
def _get_parallel_tests_skip_list(self, repo_path):
skip_list_file_path = "{}/tests/integration/parallel_skip.json".format(
repo_path
)
@staticmethod
def _get_parallel_tests_skip_list(repo_path):
skip_list_file_path = f"{repo_path}/tests/integration/parallel_skip.json"
if (
not os.path.isfile(skip_list_file_path)
or os.path.getsize(skip_list_file_path) == 0
):
raise Exception(
"There is something wrong with getting all tests list: file '{}' is empty or does not exist.".format(
skip_list_file_path
)
raise ValueError(
"There is something wrong with getting all tests list: "
f"file '{skip_list_file_path}' is empty or does not exist."
)
skip_list_tests = []
with open(skip_list_file_path, "r") as skip_list_file:
with open(skip_list_file_path, "r", encoding="utf-8") as skip_list_file:
skip_list_tests = json.load(skip_list_file)
return list(sorted(skip_list_tests))
def group_test_by_file(self, tests):
@staticmethod
def group_test_by_file(tests):
result = {}
for test in tests:
test_file = test.split("::")[0]
@ -471,7 +474,8 @@ class ClickhouseIntegrationTestsRunner:
result[test_file].append(test)
return result
def _update_counters(self, main_counters, current_counters, broken_tests):
@staticmethod
def _update_counters(main_counters, current_counters, broken_tests):
for test in current_counters["PASSED"]:
if test not in main_counters["PASSED"]:
if test in main_counters["FAILED"]:
@ -511,21 +515,23 @@ class ClickhouseIntegrationTestsRunner:
logging.info(
"Can run with custom docker image version %s", runner_version
)
image_cmd += " --docker-image-version={} ".format(runner_version)
image_cmd += f" --docker-image-version={runner_version} "
else:
if self._can_run_with(
os.path.join(repo_path, "tests/integration", "runner"),
"--docker-compose-images-tags",
):
image_cmd += "--docker-compose-images-tags={} ".format(
self.get_image_with_version(img)
image_cmd += (
"--docker-compose-images-tags="
f"{self.get_image_with_version(img)} "
)
else:
image_cmd = ""
logging.info("Cannot run with custom docker image version :(")
return image_cmd
def _find_test_data_dirs(self, repo_path, test_names):
@staticmethod
def _find_test_data_dirs(repo_path, test_names):
relpaths = {}
for test_name in test_names:
if "/" in test_name:
@ -543,7 +549,8 @@ class ClickhouseIntegrationTestsRunner:
relpaths[relpath] = mtime
return relpaths
def _get_test_data_dirs_difference(self, new_snapshot, old_snapshot):
@staticmethod
def _get_test_data_dirs_difference(new_snapshot, old_snapshot):
res = set()
for path in new_snapshot:
if (path not in old_snapshot) or (old_snapshot[path] != new_snapshot[path]):
@ -569,7 +576,7 @@ class ClickhouseIntegrationTestsRunner:
broken_tests,
)
except Exception as e:
logging.info("Failed to run {}:\n{}".format(str(test_group), str(e)))
logging.info("Failed to run %s:\n%s", test_group, e)
counters = {
"ERROR": [],
"PASSED": [],
@ -630,31 +637,27 @@ class ClickhouseIntegrationTestsRunner:
info_path = os.path.join(repo_path, "tests/integration", info_basename)
test_cmd = " ".join([shlex.quote(test) for test in sorted(test_names)])
parallel_cmd = (
" --parallel {} ".format(num_workers) if num_workers > 0 else ""
)
parallel_cmd = f" --parallel {num_workers} " if num_workers > 0 else ""
# -r -- show extra test summary:
# -f -- (f)ailed
# -E -- (E)rror
# -p -- (p)assed
# -s -- (s)kipped
cmd = "cd {}/tests/integration && timeout --signal=KILL 1h ./runner {} {} -t {} {} -- -rfEps --run-id={} --color=no --durations=0 {} | tee {}".format(
repo_path,
self._get_runner_opts(),
image_cmd,
test_cmd,
parallel_cmd,
i,
_get_deselect_option(self.should_skip_tests()),
info_path,
cmd = (
f"cd {repo_path}/tests/integration && "
f"timeout --signal=KILL 1h ./runner {self._get_runner_opts()} "
f"{image_cmd} -t {test_cmd} {parallel_cmd} -- -rfEps --run-id={i} "
f"--color=no --durations=0 {_get_deselect_option(self.should_skip_tests())} "
f"| tee {info_path}"
)
log_basename = test_group_str + "_" + str(i) + ".log"
log_path = os.path.join(repo_path, "tests/integration", log_basename)
with open(log_path, "w") as log:
with open(log_path, "w", encoding="utf-8") as log:
logging.info("Executing cmd: %s", cmd)
# ignore retcode, since it meaningful due to pipe to tee
subprocess.Popen(cmd, shell=True, stderr=log, stdout=log).wait()
with subprocess.Popen(cmd, shell=True, stderr=log, stdout=log) as proc:
proc.wait()
extra_logs_names = [log_basename]
log_result_path = os.path.join(
@ -745,11 +748,14 @@ class ClickhouseIntegrationTestsRunner:
# want to mark them as error so we filter by '::'.
for test in tests_in_group:
if (
test not in counters["PASSED"]
and test not in counters["ERROR"]
and test not in counters["SKIPPED"]
and test not in counters["FAILED"]
and test not in counters["BROKEN"]
test
not in chain(
counters["PASSED"],
counters["ERROR"],
counters["SKIPPED"],
counters["FAILED"],
counters["BROKEN"],
)
and "::" in test
):
counters["ERROR"].append(test)
@ -814,7 +820,7 @@ class ClickhouseIntegrationTestsRunner:
(
c + " (✕" + str(final_retry) + ")",
text_state,
"{:.2f}".format(tests_times[c]),
f"{tests_times[c]:.2f}",
)
for c in counters[state]
]
@ -836,7 +842,7 @@ class ClickhouseIntegrationTestsRunner:
self._install_clickhouse(build_path)
logging.info("Pulling images")
runner._pre_pull_images(repo_path)
self._pre_pull_images(repo_path)
logging.info(
"Dump iptables before run %s",
@ -909,11 +915,15 @@ class ClickhouseIntegrationTestsRunner:
logging.info("Shuffling test groups")
random.shuffle(items_to_run)
broken_tests = list()
broken_tests = []
if self.use_analyzer:
with open(f"{repo_path}/tests/analyzer_integration_broken_tests.txt") as f:
with open(
f"{repo_path}/tests/analyzer_integration_broken_tests.txt",
"r",
encoding="utf-8",
) as f:
broken_tests = f.read().splitlines()
logging.info(f"Broken tests in the list: {len(broken_tests)}")
logging.info("Broken tests in the list: %s", len(broken_tests))
for group, tests in items_to_run:
logging.info("Running test group %s containing %s tests", group, len(tests))
@ -965,12 +975,12 @@ class ClickhouseIntegrationTestsRunner:
else:
text_state = state
test_result += [
(c, text_state, "{:.2f}".format(tests_times[c]), tests_log_paths[c])
(c, text_state, f"{tests_times[c]:.2f}", tests_log_paths[c])
for c in counters[state]
]
failed_sum = len(counters["FAILED"]) + len(counters["ERROR"])
status_text = "fail: {}, passed: {}".format(failed_sum, len(counters["PASSED"]))
status_text = f"fail: {failed_sum}, passed: {len(counters['PASSED'])}"
if self.soft_deadline_time < time.time():
status_text = "Timeout, " + status_text
@ -987,10 +997,10 @@ class ClickhouseIntegrationTestsRunner:
def write_results(results_file, status_file, results, status):
with open(results_file, "w") as f:
with open(results_file, "w", encoding="utf-8") as f:
out = csv.writer(f, delimiter="\t")
out.writerows(results)
with open(status_file, "w") as f:
with open(status_file, "w", encoding="utf-8") as f:
out = csv.writer(f, delimiter="\t")
out.writerow(status)
@ -1003,7 +1013,8 @@ if __name__ == "__main__":
result_path = os.environ.get("CLICKHOUSE_TESTS_RESULT_PATH")
params_path = os.environ.get("CLICKHOUSE_TESTS_JSON_PARAMS_PATH")
params = json.loads(open(params_path, "r").read())
with open(params_path, "r", encoding="utf-8") as jfd:
params = json.loads(jfd.read())
runner = ClickhouseIntegrationTestsRunner(result_path, params)
logging.info("Running tests")

View File

@ -1,10 +1,11 @@
from helpers.cluster import run_and_check
import pytest
#!/usr/bin/env python3
import logging
import os
from helpers.test_tools import TSV
from helpers.network import _NetworkManager
import pytest # pylint:disable=import-error; for style check
from helpers.cluster import run_and_check
from helpers.network import _NetworkManager
# This is a workaround for a problem with logging in pytest [1].
#
@ -32,32 +33,35 @@ def tune_local_port_range():
def cleanup_environment():
try:
if int(os.environ.get("PYTEST_CLEANUP_CONTAINERS", 0)) == 1:
logging.debug(f"Cleaning all iptables rules")
logging.debug("Cleaning all iptables rules")
_NetworkManager.clean_all_user_iptables_rules()
result = run_and_check(["docker ps | wc -l"], shell=True)
if int(result) > 1:
if int(os.environ.get("PYTEST_CLEANUP_CONTAINERS", 0)) != 1:
logging.warning(
f"Docker containters({int(result)}) are running before tests run. They can be left from previous pytest run and cause test failures.\n"
"You can set env PYTEST_CLEANUP_CONTAINERS=1 or use runner with --cleanup-containers argument to enable automatic containers cleanup."
"Docker containters(%s) are running before tests run. "
"They can be left from previous pytest run and cause test failures.\n"
"You can set env PYTEST_CLEANUP_CONTAINERS=1 or use runner with "
"--cleanup-containers argument to enable automatic containers cleanup.",
int(result),
)
else:
logging.debug("Trying to kill unstopped containers...")
run_and_check(
[f"docker kill $(docker container list --all --quiet)"],
["docker kill $(docker container list --all --quiet)"],
shell=True,
nothrow=True,
)
run_and_check(
[f"docker rm $docker container list --all --quiet)"],
["docker rm $docker container list --all --quiet)"],
shell=True,
nothrow=True,
)
logging.debug("Unstopped containers killed")
r = run_and_check(["docker-compose", "ps", "--services", "--all"])
logging.debug(f"Docker ps before start:{r.stdout}")
logging.debug("Docker ps before start:%s", r.stdout)
else:
logging.debug(f"No running containers")
logging.debug("No running containers")
logging.debug("Pruning Docker networks")
run_and_check(
@ -66,8 +70,7 @@ def cleanup_environment():
nothrow=True,
)
except Exception as e:
logging.exception(f"cleanup_environment:{str(e)}")
pass
logging.exception("cleanup_environment:%s", e)
yield

View File

@ -39,9 +39,7 @@ def check_args_and_update_paths(args):
else:
CLICKHOUSE_ROOT = args.clickhouse_root
else:
logging.info(
"ClickHouse root is not set. Will use %s" % (DEFAULT_CLICKHOUSE_ROOT)
)
logging.info("ClickHouse root is not set. Will use %s", DEFAULT_CLICKHOUSE_ROOT)
CLICKHOUSE_ROOT = DEFAULT_CLICKHOUSE_ROOT
if not os.path.isabs(args.binary):
@ -74,9 +72,7 @@ def check_args_and_update_paths(args):
args.base_configs_dir = os.path.abspath(
os.path.join(CLICKHOUSE_ROOT, CONFIG_DIR_IN_REPO)
)
logging.info(
"Base configs dir is not set. Will use %s" % (args.base_configs_dir)
)
logging.info("Base configs dir is not set. Will use %s", args.base_configs_dir)
if args.cases_dir:
if not os.path.isabs(args.cases_dir):
@ -87,7 +83,7 @@ def check_args_and_update_paths(args):
args.cases_dir = os.path.abspath(
os.path.join(CLICKHOUSE_ROOT, INTEGRATION_DIR_IN_REPO)
)
logging.info("Cases dir is not set. Will use %s" % (args.cases_dir))
logging.info("Cases dir is not set. Will use %s", args.cases_dir)
if args.utils_dir:
if not os.path.isabs(args.utils_dir):
@ -98,12 +94,13 @@ def check_args_and_update_paths(args):
args.utils_dir = os.path.abspath(
os.path.join(CLICKHOUSE_ROOT, UTILS_DIR_IN_REPO)
)
logging.info("utils dir is not set. Will use %s" % (args.utils_dir))
logging.info("utils dir is not set. Will use %s", args.utils_dir)
logging.info(
"base_configs_dir: {}, binary: {}, cases_dir: {} ".format(
args.base_configs_dir, args.binary, args.cases_dir
)
"base_configs_dir: %s, binary: %s, cases_dir: %s ",
args.base_configs_dir,
args.binary,
args.cases_dir,
)
for path in [
@ -115,7 +112,7 @@ def check_args_and_update_paths(args):
CLICKHOUSE_ROOT,
]:
if not os.path.exists(path):
raise Exception("Path {} doesn't exist".format(path))
raise FileNotFoundError(f"Path {path} doesn't exist")
if args.dockerd_volume:
if not os.path.isabs(args.dockerd_volume):
@ -126,21 +123,22 @@ def check_args_and_update_paths(args):
if (not os.path.exists(os.path.join(args.base_configs_dir, "config.xml"))) and (
not os.path.exists(os.path.join(args.base_configs_dir, "config.yaml"))
):
raise Exception(
"No config.xml or config.yaml in {}".format(args.base_configs_dir)
raise FileNotFoundError(
f"No config.xml or config.yaml in {args.base_configs_dir}"
)
if (not os.path.exists(os.path.join(args.base_configs_dir, "users.xml"))) and (
not os.path.exists(os.path.join(args.base_configs_dir, "users.yaml"))
):
raise Exception(
"No users.xml or users.yaml in {}".format(args.base_configs_dir)
raise FileNotFoundError(
f"No users.xml or users.yaml in {args.base_configs_dir}"
)
def docker_kill_handler_handler(signum, frame):
_, _ = signum, frame
subprocess.check_call(
"docker ps --all --quiet --filter name={name}".format(name=CONTAINER_NAME),
f"docker ps --all --quiet --filter name={CONTAINER_NAME}",
shell=True,
)
raise KeyboardInterrupt("Killed by Ctrl+C")
@ -318,7 +316,7 @@ if __name__ == "__main__":
parallel_args = ""
if args.parallel:
parallel_args += "--dist=loadfile"
parallel_args += " -n {}".format(args.parallel)
parallel_args += f" -n {args.parallel}".format()
rand_args = ""
# if not args.no_random:
@ -326,7 +324,7 @@ if __name__ == "__main__":
net = ""
if args.network:
net = "--net={}".format(args.network)
net = f"--net={args.network}"
elif not args.disable_net_host:
net = "--net=host"
@ -350,9 +348,7 @@ if __name__ == "__main__":
dockerd_internal_volume = "--tmpfs /var/lib/docker -e DOCKER_RAMDISK=true"
elif args.dockerd_volume:
dockerd_internal_volume = (
"--mount type=bind,source={},target=/var/lib/docker".format(
args.dockerd_volume
)
f"--mount type=bind,source={args.dockerd_volume},target=/var/lib/docker"
)
else:
try:

View File

@ -1,18 +1,15 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
import logging
import pyodbc
import sqlite3
import traceback
import enum
import logging
import random
import sqlite3
import string
from contextlib import contextmanager
import pyodbc # pylint:disable=import-error; for style check
from exceptions import ProgramError
logger = logging.getLogger("connection")
logger.setLevel(logging.DEBUG)
@ -22,9 +19,7 @@ class OdbcConnectingArgs:
self._kwargs = kwargs
def __str__(self):
conn_str = ";".join(
["{}={}".format(x, y) for x, y in self._kwargs.items() if y]
)
conn_str = ";".join([f"{x}={y}" for x, y in self._kwargs.items() if y])
return conn_str
def update_database(self, database):
@ -49,6 +44,7 @@ class OdbcConnectingArgs:
for kv in conn_str.split(";"):
if kv:
k, v = kv.split("=", 1)
# pylint:disable-next=protected-access
args._kwargs[k] = v
return args
@ -63,7 +59,10 @@ def default_clickhouse_odbc_conn_str():
OdbcConnectingArgs.create_from_kw(
dsn="ClickHouse DSN (ANSI)",
Timeout="300",
Url="http://localhost:8123/query?default_format=ODBCDriver2&default_table_engine=MergeTree&union_default_mode=DISTINCT&group_by_use_nulls=1&join_use_nulls=1&allow_create_index_without_type=1&create_index_ignore_unique=1",
Url="http://localhost:8123/query?default_format=ODBCDriver2&"
"default_table_engine=MergeTree&union_default_mode=DISTINCT&"
"group_by_use_nulls=1&join_use_nulls=1&allow_create_index_without_type=1&"
"create_index_ignore_unique=1",
)
)
@ -82,7 +81,7 @@ class KnownDBMS(str, enum.Enum):
clickhouse = "ClickHouse"
class ConnectionWrap(object):
class ConnectionWrap:
def __init__(self, connection=None, factory=None, factory_kwargs=None):
self._factory = factory
self._factory_kwargs = factory_kwargs
@ -126,7 +125,7 @@ class ConnectionWrap(object):
f"SELECT name FROM system.tables WHERE database='{self.DATABASE_NAME}'"
)
elif self.DBMS_NAME == KnownDBMS.sqlite.value:
list_query = f"SELECT name FROM sqlite_master WHERE type='table'"
list_query = "SELECT name FROM sqlite_master WHERE type='table'"
else:
logger.warning(
"unable to drop all tables for unknown database: %s", self.DBMS_NAME
@ -154,7 +153,7 @@ class ConnectionWrap(object):
self._use_database(database)
logger.info(
"currentDatabase : %s",
execute_request(f"SELECT currentDatabase()", self).get_result(),
execute_request("SELECT currentDatabase()", self).get_result(),
)
@contextmanager
@ -174,7 +173,7 @@ class ConnectionWrap(object):
def __exit__(self, *args):
if hasattr(self._connection, "close"):
return self._connection.close()
self._connection.close()
def setup_connection(engine, conn_str=None, make_debug_request=True):
@ -263,7 +262,7 @@ class ExecResult:
def assert_no_exception(self):
if self.has_exception():
raise ProgramError(
f"request doesn't have a result set, it has the exception",
"request doesn't have a result set, it has the exception",
parent=self._exception,
)

View File

@ -1,8 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from enum import Enum
class Error(Exception):
def __init__(
@ -45,16 +43,8 @@ class Error(Exception):
@property
def reason(self):
return ", ".join(
(
str(x)
for x in [
super().__str__(),
"details: {}".format(self._details) if self._details else "",
]
if x
)
)
details = f"details: {self._details}" if self._details else ""
return ", ".join((str(x) for x in [super().__str__(), details] if x))
def set_details(self, file=None, name=None, pos=None, request=None, details=None):
if file is not None:
@ -88,16 +78,8 @@ class ErrorWithParent(Error):
@property
def reason(self):
return ", ".join(
(
str(x)
for x in [
super().reason,
"exception: {}".format(str(self._parent)) if self._parent else "",
]
if x
)
)
exception = f"exception: {self._parent}" if self._parent else ""
return ", ".join((str(x) for x in [super().reason, exception] if x))
class ProgramError(ErrorWithParent):

View File

@ -2,20 +2,25 @@
# -*- coding: utf-8 -*-
import argparse
import enum
import os
import logging
import csv
import enum
import json
import logging
import multiprocessing
import os
from functools import reduce
from deepdiff import DeepDiff
from connection import setup_connection, Engines, default_clickhouse_odbc_conn_str
from test_runner import TestRunner, Status, RequestType
# isort: off
from deepdiff import DeepDiff # pylint:disable=import-error; for style check
# isort: on
LEVEL_NAMES = [x.lower() for x in logging._nameToLevel.keys() if x != logging.NOTSET]
from connection import Engines, default_clickhouse_odbc_conn_str, setup_connection
from test_runner import RequestType, Status, TestRunner
LEVEL_NAMES = [ # pylint:disable-next=protected-access
l.lower() for l, n in logging._nameToLevel.items() if n != logging.NOTSET
]
def setup_logger(args):
@ -41,7 +46,7 @@ def __write_check_status(status_row, out_dir):
if len(status_row) > 140:
status_row = status_row[0:135] + "..."
check_status_path = os.path.join(out_dir, "check_status.tsv")
with open(check_status_path, "a") as stream:
with open(check_status_path, "a", encoding="utf-8") as stream:
writer = csv.writer(stream, delimiter="\t", lineterminator="\n")
writer.writerow(status_row)
@ -60,7 +65,7 @@ def __write_test_result(
):
all_stages = reports.keys()
test_results_path = os.path.join(out_dir, "test_results.tsv")
with open(test_results_path, "a") as stream:
with open(test_results_path, "a", encoding="utf-8") as stream:
writer = csv.writer(stream, delimiter="\t", lineterminator="\n")
for stage in all_stages:
report = reports[stage]
@ -182,7 +187,7 @@ def mode_check_statements(parser):
input_dir, f"check statements:: not a dir {input_dir}"
)
reports = dict()
reports = {}
out_stages_dir = os.path.join(out_dir, f"{args.mode}-stages")
@ -242,7 +247,7 @@ def mode_check_complete(parser):
input_dir, f"check statements:: not a dir {input_dir}"
)
reports = dict()
reports = {}
out_stages_dir = os.path.join(out_dir, f"{args.mode}-stages")
@ -286,9 +291,9 @@ def make_actual_report(reports):
return {stage: report.get_map() for stage, report in reports.items()}
def write_actual_report(actial, out_dir):
with open(os.path.join(out_dir, "actual_report.json"), "w") as f:
f.write(json.dumps(actial))
def write_actual_report(actual, out_dir):
with open(os.path.join(out_dir, "actual_report.json"), "w", encoding="utf-8") as f:
f.write(json.dumps(actual))
def read_canonic_report(input_dir):
@ -296,13 +301,15 @@ def read_canonic_report(input_dir):
if not os.path.exists(file):
return {}
with open(os.path.join(input_dir, "canonic_report.json"), "r") as f:
with open(
os.path.join(input_dir, "canonic_report.json"), "r", encoding="utf-8"
) as f:
data = f.read()
return json.loads(data)
def write_canonic_report(canonic, out_dir):
with open(os.path.join(out_dir, "canonic_report.json"), "w") as f:
with open(os.path.join(out_dir, "canonic_report.json"), "w", encoding="utf-8") as f:
f.write(json.dumps(canonic))
@ -370,7 +377,7 @@ def mode_self_test(parser):
if not os.path.isdir(out_dir):
raise NotADirectoryError(out_dir, f"self test: not a dir {out_dir}")
reports = dict()
reports = {}
out_stages_dir = os.path.join(out_dir, f"{args.mode}-stages")

View File

@ -2,24 +2,27 @@
# -*- coding: utf-8 -*-
import logging
import os
from itertools import chain
from enum import Enum
from hashlib import md5
from functools import reduce
from hashlib import md5
from itertools import chain
# isort: off
# pylint:disable=import-error; for style check
import sqlglot
from sqlglot.expressions import PrimaryKeyColumnConstraint, ColumnDef
from sqlglot.expressions import ColumnDef, PrimaryKeyColumnConstraint
# pylint:enable=import-error; for style check
# isort: on
from exceptions import (
Error,
ProgramError,
ErrorWithParent,
DataResultDiffer,
Error,
ErrorWithParent,
ProgramError,
QueryExecutionError,
)
logger = logging.getLogger("parser")
logger.setLevel(logging.DEBUG)
@ -248,6 +251,7 @@ class FileBlockBase:
)
block.with_result(result)
return block
raise ValueError(f"Unknown block_type {block_type}")
def dump_to(self, output):
if output is None:
@ -258,9 +262,6 @@ class FileBlockBase:
class FileBlockComments(FileBlockBase):
def __init__(self, parser, start, end):
super().__init__(parser, start, end)
def get_block_type(self):
return BlockType.comments
@ -469,20 +470,18 @@ class QueryResult:
(
str(x)
for x in [
"rows: {}".format(self.rows) if self.rows else "",
"values_count: {}".format(self.values_count)
if self.values_count
else "",
"data_hash: {}".format(self.data_hash) if self.data_hash else "",
"exception: {}".format(self.exception) if self.exception else "",
"hash_threshold: {}".format(self.hash_threshold)
f"rows: {self.rows}" if self.rows else "",
f"values_count: {self.values_count}" if self.values_count else "",
f"data_hash: {self.data_hash}" if self.data_hash else "",
f"exception: {self.exception}" if self.exception else "",
f"hash_threshold: {self.hash_threshold}"
if self.hash_threshold
else "",
]
if x
)
)
return "QueryResult({})".format(params)
return f"QueryResult({params})"
def __iter__(self):
if self.rows is not None:
@ -491,12 +490,10 @@ class QueryResult:
if self.values_count <= self.hash_threshold:
return iter(self.rows)
if self.data_hash is not None:
return iter(
[["{} values hashing to {}".format(self.values_count, self.data_hash)]]
)
return iter([[f"{self.values_count} values hashing to {self.data_hash}"]])
if self.exception is not None:
return iter([["exception: {}".format(self.exception)]])
raise ProgramError("Query result is empty", details="{}".format(self.__str__()))
return iter([[f"exception: {self.exception}"]])
raise ProgramError("Query result is empty", details=str(self))
@staticmethod
def __value_count(rows):
@ -528,7 +525,7 @@ class QueryResult:
for row in rows:
res_row = []
for c, t in zip(row, types):
logger.debug(f"Builging row. c:{c} t:{t}")
logger.debug("Builging row. c:%s t:%s", c, t)
if c is None:
res_row.append("NULL")
continue
@ -541,7 +538,7 @@ class QueryResult:
elif t == "I":
try:
res_row.append(str(int(c)))
except ValueError as ex:
except ValueError:
# raise QueryExecutionError(
# f"Got non-integer result '{c}' for I type."
# )
@ -549,7 +546,7 @@ class QueryResult:
except OverflowError as ex:
raise QueryExecutionError(
f"Got overflowed result '{c}' for I type."
)
) from ex
elif t == "R":
res_row.append(f"{c:.3f}")
@ -567,6 +564,7 @@ class QueryResult:
values = list(chain(*rows))
values.sort()
return [values] if values else []
return []
@staticmethod
def __calculate_hash(rows):
@ -595,9 +593,9 @@ class QueryResult:
# do not print details to the test file
# but print original exception
if isinstance(e, ErrorWithParent):
message = "{}, original is: {}".format(e, e.get_parent())
message = f"{e}, original is: {e.get_parent()}"
else:
message = "{}".format(e)
message = str(e)
return QueryResult(exception=message)
@ -616,7 +614,6 @@ class QueryResult:
"canonic and actual results have different exceptions",
details=f"canonic: {canonic.exception}, actual: {actual.exception}",
)
else:
# exceptions are the same
return
elif canonic.exception is not None:
@ -639,9 +636,8 @@ class QueryResult:
if canonic.values_count != actual.values_count:
raise DataResultDiffer(
"canonic and actual results have different value count",
details="canonic values count {}, actual {}".format(
canonic.values_count, actual.values_count
),
details=f"canonic values count {canonic.values_count}, "
f"actual {actual.values_count}",
)
if canonic.data_hash != actual.data_hash:
raise DataResultDiffer(
@ -653,9 +649,8 @@ class QueryResult:
if canonic.values_count != actual.values_count:
raise DataResultDiffer(
"canonic and actual results have different value count",
details="canonic values count {}, actual {}".format(
canonic.values_count, actual.values_count
),
details=f"canonic values count {canonic.values_count}, "
f"actual {actual.values_count}",
)
if canonic.rows != actual.rows:
raise DataResultDiffer(
@ -665,5 +660,5 @@ class QueryResult:
raise ProgramError(
"Unable to compare results",
details="actual {}, canonic {}".format(actual, canonic),
details=f"actual {actual}, canonic {canonic}",
)

View File

@ -1,25 +1,23 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import enum
import logging
import os
import traceback
import io
import json
import logging
import os
import test_parser
from connection import execute_request
from exceptions import (
DataResultDiffer,
Error,
ProgramError,
DataResultDiffer,
StatementExecutionError,
StatementSuccess,
QueryExecutionError,
QuerySuccess,
SchemeResultDiffer,
StatementExecutionError,
StatementSuccess,
)
from connection import execute_request
logger = logging.getLogger("parser")
logger.setLevel(logging.DEBUG)
@ -55,6 +53,7 @@ class Status(str, enum.Enum):
class TestStatus:
def __init__(self):
self.name = None
self.status = None
self.file = None
self.position = None
@ -155,7 +154,7 @@ class SimpleStats:
self.success += 1
def get_map(self):
result = dict()
result = {}
result["success"] = self.success
result["fail"] = self.fail
return result
@ -187,7 +186,7 @@ class Stats:
choose.update(status)
def get_map(self):
result = dict()
result = {}
result["statements"] = self.statements.get_map()
result["queries"] = self.queries.get_map()
result["total"] = self.total.get_map()
@ -205,7 +204,7 @@ class OneReport:
self.test_name = test_name
self.test_file = test_file
self.stats = Stats()
self.requests = dict() # type: dict(int, TestStatus)
self.requests = {}
def update(self, status):
if not isinstance(status, TestStatus):
@ -218,11 +217,11 @@ class OneReport:
return str(self.get_map())
def get_map(self):
result = dict()
result = {}
result["test_name"] = self.test_name
result["test_file"] = self.test_file
result["stats"] = self.stats.get_map()
result["requests"] = dict()
result["requests"] = {}
requests = result["requests"]
for pos, status in self.requests.items():
requests[pos] = status.get_map()
@ -233,7 +232,7 @@ class Report:
def __init__(self, dbms_name, input_dir=None):
self.dbms_name = dbms_name
self.stats = Stats()
self.tests = dict() # type: dict(str, OneReport)
self.tests = {}
self.input_dir = input_dir
self.output_dir = None
@ -256,7 +255,7 @@ class Report:
self.output_dir = res_dir
def get_map(self):
result = dict()
result = {}
result["dbms_name"] = self.dbms_name
result["stats"] = self.stats.get_map()
result["input_dir"] = self.input_dir
@ -264,7 +263,7 @@ class Report:
result["input_dir"] = self.input_dir
if self.output_dir is not None:
result["output_dir"] = self.output_dir
result["tests"] = dict()
result["tests"] = {}
tests = result["tests"]
for test_name, one_report in self.tests.items():
tests.update({test_name: one_report.get_map()})
@ -297,8 +296,8 @@ class Report:
def write_report(self, report_dir):
report_path = os.path.join(report_dir, "report.json")
logger.info(f"create file {report_path}")
with open(report_path, "w") as stream:
logger.info("create file %s", report_path)
with open(report_path, "w", encoding="utf-8") as stream:
stream.write(json.dumps(self.get_map(), indent=4))
@ -434,16 +433,13 @@ class TestRunner:
details=f"expected error: {expected_error}",
parent=exec_res.get_exception(),
)
else:
clogger.debug("errors matched")
raise QuerySuccess()
else:
clogger.debug("missed error")
raise QueryExecutionError(
"query is expected to fail with error",
details="expected error: {}".format(expected_error),
details=f"expected error: {expected_error}",
)
else:
clogger.debug("success is expected")
if exec_res.has_exception():
clogger.debug("had error")
@ -460,7 +456,6 @@ class TestRunner:
test_parser.QueryResult.assert_eq(canonic, actual)
block.with_result(actual)
raise QuerySuccess()
else:
clogger.debug("completion mode")
raise QueryExecutionError(
"query execution failed with an exception",
@ -476,9 +471,8 @@ class TestRunner:
if canonic_columns_count != actual_columns_count:
raise SchemeResultDiffer(
"canonic and actual columns count differ",
details="expected columns {}, actual columns {}".format(
canonic_columns_count, actual_columns_count
),
details=f"expected columns {canonic_columns_count}, "
f"actual columns {actual_columns_count}",
)
actual = test_parser.QueryResult.make_it(
@ -528,7 +522,7 @@ class TestRunner:
self.report = Report(self.dbms_name, self._input_dir)
if self.results is None:
self.results = dict()
self.results = {}
if self.dbms_name == "ClickHouse" and test_name in [
"test/select5.test",
@ -536,7 +530,7 @@ class TestRunner:
"test/evidence/slt_lang_replace.test",
"test/evidence/slt_lang_droptrigger.test",
]:
logger.info(f"Let's skip test %s for ClickHouse", test_name)
logger.info("Let's skip test %s for ClickHouse", test_name)
return
with self.connection.with_one_test_scope():
@ -565,7 +559,7 @@ class TestRunner:
test_name = os.path.relpath(test_file, start=self._input_dir)
logger.debug("open file %s", test_name)
with open(test_file, "r") as stream:
with open(test_file, "r", encoding="utf-8") as stream:
self.run_one_test(stream, test_name, test_file)
def run_all_tests_from_dir(self, input_dir):
@ -582,10 +576,10 @@ class TestRunner:
for test_name, stream in self.results.items():
test_file = os.path.join(dir_path, test_name)
logger.info(f"create file {test_file}")
logger.info("create file %s", test_file)
result_dir = os.path.dirname(test_file)
os.makedirs(result_dir, exist_ok=True)
with open(test_file, "w") as output:
with open(test_file, "w", encoding="utf-8") as output:
output.write(stream.getvalue())
def write_report(self, report_dir):

View File

@ -152,8 +152,15 @@ find $ROOT_PATH/{src,base,programs,utils} -name '*.xml' |
grep -vP $EXCLUDE_DIRS |
xargs xmllint --noout --nonet
# FIXME: for now only clickhouse-test
pylint --rcfile=$ROOT_PATH/.pylintrc --persistent=no --score=n $ROOT_PATH/tests/clickhouse-test $ROOT_PATH/tests/ci/*.py
function xargs-pylint {
# $1 is number maximum arguments per pylint process
sort | awk '$2=="text/x-script.python" {print $1}' | \
xargs -P "$(nproc)" -n "$1" pylint --rcfile="$ROOT_PATH/pyproject.toml" --persistent=no --score=n
}
find "$ROOT_PATH/tests" -maxdepth 2 -type f -exec file -F' ' --mime-type {} + | xargs-pylint 50
# Beware, there lambdas are checked. All of them contain `app`, and it causes brain-cucumber-zalgo
find "$ROOT_PATH/tests/ci" -mindepth 2 -type f -exec file -F' ' --mime-type {} + | xargs-pylint 1
find $ROOT_PATH -not -path $ROOT_PATH'/contrib*' \( -name '*.yaml' -or -name '*.yml' \) -type f |
grep -vP $EXCLUDE_DIRS |