Migrate S3Helper to pathlib.Path

This commit is contained in:
Mikhail f. Shiryaev 2023-08-29 16:35:53 +02:00
parent 42ce81e62a
commit e18d9d39e8
No known key found for this signature in database
GPG Key ID: 4B02ED204C7D93F4
10 changed files with 138 additions and 140 deletions

View File

@ -160,7 +160,7 @@ def main():
s3_helper = S3Helper()
for f in paths:
try:
paths[f] = s3_helper.upload_test_report_to_s3(paths[f], s3_prefix + f)
paths[f] = s3_helper.upload_test_report_to_s3(Path(paths[f]), s3_prefix + f)
except Exception as ex:
logging.info("Exception uploading file %s text %s", f, ex)
paths[f] = ""

View File

@ -345,7 +345,7 @@ def main():
os.remove(performance_path)
build_urls = (
s3_helper.upload_build_folder_to_s3(
s3_helper.upload_build_directory_to_s3(
build_output_path,
s3_path_prefix,
keep_dirs_in_s3_path=False,

View File

@ -6,7 +6,7 @@ import os
import sys
import time
from pathlib import Path
from typing import Any, Callable, List
from typing import Any, Callable, List, Union
import requests # type: ignore
@ -98,7 +98,7 @@ def get_build_name_for_check(check_name: str) -> str:
return CI_CONFIG.test_configs[check_name].required_build
def read_build_urls(build_name: str, reports_path: str) -> List[str]:
def read_build_urls(build_name: str, reports_path: Union[Path, str]) -> List[str]:
for root, _, files in os.walk(reports_path):
for f in files:
if build_name in f:

View File

@ -5,6 +5,7 @@ import logging
import os
import sys
import atexit
from pathlib import Path
from typing import Dict, List, Tuple
from github import Github
@ -118,11 +119,10 @@ def get_build_name_from_file_name(file_name):
def main():
logging.basicConfig(level=logging.INFO)
temp_path = TEMP_PATH
temp_path = Path(TEMP_PATH)
logging.info("Reports path %s", REPORTS_PATH)
if not os.path.exists(temp_path):
os.makedirs(temp_path)
temp_path.mkdir(parents=True, exist_ok=True)
build_check_name = sys.argv[1]
needs_data = {} # type: NeedsDataType
@ -242,9 +242,8 @@ def main():
commit_url,
)
report_path = os.path.join(temp_path, "report.html")
with open(report_path, "w", encoding="utf-8") as fd:
fd.write(report)
report_path = temp_path / "report.html"
report_path.write_text(report, encoding="utf-8")
logging.info("Going to upload prepared report")
context_name_for_path = build_check_name.lower().replace(" ", "_")

View File

@ -6,7 +6,8 @@ import time
import subprocess
import logging
from typing import List, Optional
from pathlib import Path
from typing import List, Optional, Union
class DockerImage:
@ -22,7 +23,7 @@ class DockerImage:
def get_images_with_versions(
reports_path: str,
reports_path: Union[Path, str],
required_images: List[str],
pull: bool = True,
version: Optional[str] = None,
@ -80,7 +81,10 @@ def get_images_with_versions(
def get_image_with_version(
reports_path: str, image: str, pull: bool = True, version: Optional[str] = None
reports_path: Union[Path, str],
image: str,
pull: bool = True,
version: Optional[str] = None,
) -> DockerImage:
logging.info("Looking for images file in %s", reports_path)
return get_images_with_versions(reports_path, [image], pull, version=version)[0]

View File

@ -54,23 +54,21 @@ def get_fasttest_cmd(workspace, output_path, repo_path, pr_number, commit_sha, i
)
def process_results(result_folder: str) -> Tuple[str, str, TestResults, List[str]]:
def process_results(result_folder: Path) -> Tuple[str, str, TestResults, List[str]]:
test_results = [] # type: TestResults
additional_files = []
# Just upload all files from result_folder.
# If task provides processed results, then it's responsible for content of
# result_folder
if os.path.exists(result_folder):
if result_folder.exists():
test_files = [
f
for f in os.listdir(result_folder)
if os.path.isfile(os.path.join(result_folder, f))
]
additional_files = [os.path.join(result_folder, f) for f in test_files]
f for f in result_folder.iterdir() if f.is_file()
] # type: List[Path]
additional_files = [f.absolute().as_posix() for f in test_files]
status = []
status_path = os.path.join(result_folder, "check_status.tsv")
if os.path.exists(status_path):
status_path = result_folder / "check_status.tsv"
if status_path.exists():
logging.info("Found test_results.tsv")
with open(status_path, "r", encoding="utf-8") as status_file:
status = list(csv.reader(status_file, delimiter="\t"))
@ -80,7 +78,7 @@ def process_results(result_folder: str) -> Tuple[str, str, TestResults, List[str
state, description = status[0][0], status[0][1]
try:
results_path = Path(result_folder) / "test_results.tsv"
results_path = result_folder / "test_results.tsv"
test_results = read_test_results(results_path)
if len(test_results) == 0:
return "error", "Empty test_results.tsv", test_results, additional_files
@ -100,10 +98,9 @@ def main():
stopwatch = Stopwatch()
temp_path = TEMP_PATH
temp_path = Path(TEMP_PATH)
if not os.path.exists(temp_path):
os.makedirs(temp_path)
temp_path.mkdir(parents=True, exist_ok=True)
pr_info = PRInfo()
@ -124,17 +121,14 @@ def main():
s3_helper = S3Helper()
workspace = os.path.join(temp_path, "fasttest-workspace")
if not os.path.exists(workspace):
os.makedirs(workspace)
workspace = temp_path / "fasttest-workspace"
workspace.mkdir(parents=True, exist_ok=True)
output_path = os.path.join(temp_path, "fasttest-output")
if not os.path.exists(output_path):
os.makedirs(output_path)
output_path = temp_path / "fasttest-output"
output_path.mkdir(parents=True, exist_ok=True)
repo_path = os.path.join(temp_path, "fasttest-repo")
if not os.path.exists(repo_path):
os.makedirs(repo_path)
repo_path = temp_path / "fasttest-repo"
repo_path.mkdir(parents=True, exist_ok=True)
run_cmd = get_fasttest_cmd(
workspace,
@ -146,11 +140,10 @@ def main():
)
logging.info("Going to run fasttest with cmd %s", run_cmd)
logs_path = os.path.join(temp_path, "fasttest-logs")
if not os.path.exists(logs_path):
os.makedirs(logs_path)
logs_path = temp_path / "fasttest-logs"
logs_path.mkdir(parents=True, exist_ok=True)
run_log_path = os.path.join(logs_path, "run.log")
run_log_path = logs_path / "run.log"
with TeePopen(run_cmd, run_log_path, timeout=90 * 60) as process:
retcode = process.wait()
if retcode == 0:
@ -161,9 +154,7 @@ def main():
subprocess.check_call(f"sudo chown -R ubuntu:ubuntu {temp_path}", shell=True)
test_output_files = os.listdir(output_path)
additional_logs = []
for f in test_output_files:
additional_logs.append(os.path.join(output_path, f))
additional_logs = [os.path.join(output_path, f) for f in test_output_files]
test_log_exists = (
"test_log.txt" in test_output_files or "test_result.txt" in test_output_files
@ -194,8 +185,8 @@ def main():
pr_info.sha,
"fast_tests",
)
build_urls = s3_helper.upload_build_folder_to_s3(
os.path.join(output_path, "binaries"),
build_urls = s3_helper.upload_build_directory_to_s3(
output_path / "binaries",
s3_path_prefix,
keep_dirs_in_s3_path=False,
upload_symlinks=False,
@ -206,7 +197,7 @@ def main():
pr_info.number,
pr_info.sha,
test_results,
[run_log_path] + additional_logs,
[run_log_path.as_posix()] + additional_logs,
NAME,
build_urls,
)

View File

@ -7,6 +7,7 @@ import json
import subprocess
import traceback
import re
from pathlib import Path
from typing import Dict
from github import Github
@ -218,15 +219,17 @@ if __name__ == "__main__":
uploaded = {} # type: Dict[str, str]
for name, path in paths.items():
try:
uploaded[name] = s3_helper.upload_test_report_to_s3(path, s3_prefix + name)
uploaded[name] = s3_helper.upload_test_report_to_s3(
Path(path), s3_prefix + name
)
except Exception:
uploaded[name] = ""
traceback.print_exc()
# Upload all images and flamegraphs to S3
try:
s3_helper.upload_test_folder_to_s3(
os.path.join(result_path, "images"), s3_prefix + "images"
s3_helper.upload_test_directory_to_s3(
Path(result_path) / "images", s3_prefix + "images"
)
except Exception:
traceback.print_exc()

View File

@ -52,12 +52,14 @@ class S3Helper:
self.host = S3_URL
self.download_host = S3_DOWNLOAD
def _upload_file_to_s3(self, bucket_name: str, file_path: str, s3_path: str) -> str:
def _upload_file_to_s3(
self, bucket_name: str, file_path: Path, s3_path: str
) -> str:
logging.debug(
"Start uploading %s to bucket=%s path=%s", file_path, bucket_name, s3_path
)
metadata = {}
if os.path.getsize(file_path) < 64 * 1024 * 1024:
if file_path.stat().st_size < 64 * 1024 * 1024:
if (
s3_path.endswith("txt")
or s3_path.endswith("log")
@ -97,17 +99,14 @@ class S3Helper:
if re.search(r"\.(txt|log|err|out)$", s3_path) or re.search(
r"\.log\..*(?<!\.zst)$", s3_path
):
compressed_path = file_path.with_suffix(file_path.suffix + ".zst")
logging.info(
"Going to compress file log file %s to %s",
file_path,
file_path + ".zst",
compressed_path,
)
# FIXME: rewrite S3 to Path
_file_path = Path(file_path)
compress_file_fast(
_file_path, _file_path.with_suffix(_file_path.suffix + ".zst")
)
file_path += ".zst"
compress_file_fast(file_path, compressed_path)
file_path = compressed_path
s3_path += ".zst"
else:
logging.info("Processing file without compression")
@ -121,22 +120,20 @@ class S3Helper:
logging.info("Upload %s to %s. Meta: %s", file_path, url, metadata)
return url
def upload_test_report_to_s3(self, file_path: str, s3_path: str) -> str:
def upload_test_report_to_s3(self, file_path: Path, s3_path: str) -> str:
if CI:
return self._upload_file_to_s3(S3_TEST_REPORTS_BUCKET, file_path, s3_path)
else:
return S3Helper.copy_file_to_local(
S3_TEST_REPORTS_BUCKET, file_path, s3_path
)
def upload_build_file_to_s3(self, file_path, s3_path):
return S3Helper.copy_file_to_local(S3_TEST_REPORTS_BUCKET, file_path, s3_path)
def upload_build_file_to_s3(self, file_path: Path, s3_path: str) -> str:
if CI:
return self._upload_file_to_s3(S3_BUILDS_BUCKET, file_path, s3_path)
else:
return S3Helper.copy_file_to_local(S3_BUILDS_BUCKET, file_path, s3_path)
return S3Helper.copy_file_to_local(S3_BUILDS_BUCKET, file_path, s3_path)
def fast_parallel_upload_dir(
self, dir_path: Union[str, Path], s3_dir_path: str, bucket_name: str
self, dir_path: Path, s3_dir_path: str, bucket_name: str
) -> List[str]:
all_files = []
@ -196,37 +193,37 @@ class S3Helper:
logging.basicConfig(level=original_level)
return result
def _upload_folder_to_s3(
def _upload_directory_to_s3(
self,
folder_path,
s3_folder_path,
bucket_name,
keep_dirs_in_s3_path,
upload_symlinks,
):
directory_path: Path,
s3_directory_path: str,
bucket_name: str,
keep_dirs_in_s3_path: bool,
upload_symlinks: bool,
) -> List[str]:
logging.info(
"Upload folder '%s' to bucket=%s of s3 folder '%s'",
folder_path,
"Upload directory '%s' to bucket=%s of s3 directory '%s'",
directory_path,
bucket_name,
s3_folder_path,
s3_directory_path,
)
if not os.path.exists(folder_path):
if not directory_path.exists():
return []
files = os.listdir(folder_path)
files = list(directory_path.iterdir())
if not files:
return []
p = Pool(min(len(files), 5))
def task(file_name):
full_fs_path = os.path.join(folder_path, file_name)
def task(file_path: Path) -> Union[str, List[str]]:
full_fs_path = file_path.absolute()
if keep_dirs_in_s3_path:
full_s3_path = s3_folder_path + "/" + os.path.basename(folder_path)
full_s3_path = os.path.join(s3_directory_path, directory_path.name)
else:
full_s3_path = s3_folder_path
full_s3_path = s3_directory_path
if os.path.isdir(full_fs_path):
return self._upload_folder_to_s3(
return self._upload_directory_to_s3(
full_fs_path,
full_s3_path,
bucket_name,
@ -234,60 +231,63 @@ class S3Helper:
upload_symlinks,
)
if os.path.islink(full_fs_path):
if full_fs_path.is_symlink():
if upload_symlinks:
if CI:
return self._upload_file_to_s3(
bucket_name, full_fs_path, full_s3_path + "/" + file_name
)
else:
return S3Helper.copy_file_to_local(
bucket_name, full_fs_path, full_s3_path + "/" + file_name
bucket_name,
full_fs_path,
full_s3_path + "/" + file_path.name,
)
return S3Helper.copy_file_to_local(
bucket_name, full_fs_path, full_s3_path + "/" + file_path.name
)
return []
if CI:
return self._upload_file_to_s3(
bucket_name, full_fs_path, full_s3_path + "/" + file_name
)
else:
return S3Helper.copy_file_to_local(
bucket_name, full_fs_path, full_s3_path + "/" + file_name
bucket_name, full_fs_path, full_s3_path + "/" + file_path.name
)
return S3Helper.copy_file_to_local(
bucket_name, full_fs_path, full_s3_path + "/" + file_path.name
)
return sorted(_flatten_list(list(p.map(task, files))))
def upload_build_folder_to_s3(
def upload_build_directory_to_s3(
self,
folder_path,
s3_folder_path,
keep_dirs_in_s3_path=True,
upload_symlinks=True,
):
return self._upload_folder_to_s3(
folder_path,
s3_folder_path,
directory_path: Path,
s3_directory_path: str,
keep_dirs_in_s3_path: bool = True,
upload_symlinks: bool = True,
) -> List[str]:
return self._upload_directory_to_s3(
directory_path,
s3_directory_path,
S3_BUILDS_BUCKET,
keep_dirs_in_s3_path,
upload_symlinks,
)
def upload_test_folder_to_s3(
def upload_test_directory_to_s3(
self,
folder_path,
s3_folder_path,
keep_dirs_in_s3_path=True,
upload_symlinks=True,
):
return self._upload_folder_to_s3(
folder_path,
s3_folder_path,
directory_path: Path,
s3_directory_path: str,
keep_dirs_in_s3_path: bool = True,
upload_symlinks: bool = True,
) -> List[str]:
return self._upload_directory_to_s3(
directory_path,
s3_directory_path,
S3_TEST_REPORTS_BUCKET,
keep_dirs_in_s3_path,
upload_symlinks,
)
def list_prefix(self, s3_prefix_path, bucket=S3_BUILDS_BUCKET):
def list_prefix(
self, s3_prefix_path: str, bucket: str = S3_BUILDS_BUCKET
) -> List[str]:
objects = self.client.list_objects_v2(Bucket=bucket, Prefix=s3_prefix_path)
result = []
if "Contents" in objects:
@ -296,7 +296,7 @@ class S3Helper:
return result
def exists(self, key, bucket=S3_BUILDS_BUCKET):
def exists(self, key: str, bucket: str = S3_BUILDS_BUCKET) -> bool:
try:
self.client.head_object(Bucket=bucket, Key=key)
return True
@ -304,13 +304,12 @@ class S3Helper:
return False
@staticmethod
def copy_file_to_local(bucket_name: str, file_path: str, s3_path: str) -> str:
local_path = os.path.abspath(
os.path.join(RUNNER_TEMP, "s3", bucket_name, s3_path)
)
local_dir = os.path.dirname(local_path)
if not os.path.exists(local_dir):
os.makedirs(local_dir)
def copy_file_to_local(bucket_name: str, file_path: Path, s3_path: str) -> str:
local_path = (
Path(RUNNER_TEMP) / "s3" / os.path.join(bucket_name, s3_path)
).absolute()
local_dir = local_path.parent
local_dir.mkdir(parents=True, exist_ok=True)
shutil.copy(file_path, local_path)
logging.info("Copied %s to %s", file_path, local_path)

View File

@ -4,6 +4,8 @@ import logging
import subprocess
import os
import sys
from pathlib import Path
from typing import Dict
from github import Github
@ -47,13 +49,12 @@ def main():
stopwatch = Stopwatch()
temp_path = TEMP_PATH
reports_path = REPORTS_PATH
temp_path = Path(TEMP_PATH)
reports_path = Path(REPORTS_PATH)
check_name = sys.argv[1]
if not os.path.exists(temp_path):
os.makedirs(temp_path)
temp_path.mkdir(parents=True, exist_ok=True)
pr_info = PRInfo()
@ -82,7 +83,7 @@ def main():
logging.info("Got build url %s", build_url)
workspace_path = os.path.join(temp_path, "workspace")
workspace_path = temp_path / "workspace"
if not os.path.exists(workspace_path):
os.makedirs(workspace_path)
@ -91,7 +92,7 @@ def main():
)
logging.info("Going to run %s", run_command)
run_log_path = os.path.join(temp_path, "run.log")
run_log_path = temp_path / "run.log"
with open(run_log_path, "w", encoding="utf-8") as log:
with subprocess.Popen(
run_command, shell=True, stderr=log, stdout=log
@ -110,23 +111,24 @@ def main():
s3_prefix = f"{pr_info.number}/{pr_info.sha}/sqltest_{check_name_lower}/"
paths = {
"run.log": run_log_path,
"server.log.zst": os.path.join(workspace_path, "server.log.zst"),
"server.err.log.zst": os.path.join(workspace_path, "server.err.log.zst"),
"report.html": os.path.join(workspace_path, "report.html"),
"test.log": os.path.join(workspace_path, "test.log"),
"server.log.zst": workspace_path / "server.log.zst",
"server.err.log.zst": workspace_path / "server.err.log.zst",
"report.html": workspace_path / "report.html",
"test.log": workspace_path / "test.log",
}
path_urls = {} # type: Dict[str, str]
s3_helper = S3Helper()
for f in paths:
try:
paths[f] = s3_helper.upload_test_report_to_s3(paths[f], s3_prefix + f)
path_urls[f] = s3_helper.upload_test_report_to_s3(paths[f], s3_prefix + f)
except Exception as ex:
logging.info("Exception uploading file %s text %s", f, ex)
paths[f] = ""
path_urls[f] = ""
report_url = GITHUB_RUN_URL
if paths["report.html"]:
report_url = paths["report.html"]
if path_urls["report.html"]:
report_url = path_urls["report.html"]
status = "success"
description = "See the report"

View File

@ -34,7 +34,7 @@ def process_logs(
test_result.log_urls.append(processed_logs[path])
elif path:
url = s3_client.upload_test_report_to_s3(
path.as_posix(), s3_path_prefix + "/" + path.name
path, s3_path_prefix + "/" + path.name
)
test_result.log_urls.append(url)
processed_logs[path] = url
@ -44,7 +44,7 @@ def process_logs(
if log_path:
additional_urls.append(
s3_client.upload_test_report_to_s3(
log_path, s3_path_prefix + "/" + os.path.basename(log_path)
Path(log_path), s3_path_prefix + "/" + os.path.basename(log_path)
)
)
@ -100,9 +100,9 @@ def upload_results(
additional_urls,
statuscolors=statuscolors,
)
with open("report.html", "w", encoding="utf-8") as f:
f.write(html_report)
report_path = Path("report.html")
report_path.write_text(html_report, encoding="utf-8")
url = s3_client.upload_test_report_to_s3("report.html", s3_path_prefix + ".html")
url = s3_client.upload_test_report_to_s3(report_path, s3_path_prefix + ".html")
logging.info("Search result in url %s", url)
return url