ClickHouse/tests/ci/s3_helper.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

317 lines
10 KiB
Python
Raw Normal View History

2021-09-10 14:27:03 +00:00
# -*- coding: utf-8 -*-
import hashlib
import logging
import os
2021-11-26 14:00:09 +00:00
import re
import shutil
2021-12-20 19:43:37 +00:00
import time
2021-09-10 14:27:03 +00:00
from multiprocessing.dummy import Pool
from pathlib import Path
from typing import List, Union
2021-11-26 14:00:09 +00:00
import boto3 # type: ignore
2023-05-09 15:57:23 +00:00
import botocore # type: ignore
2021-11-26 14:00:09 +00:00
2022-08-11 13:01:32 +00:00
from env_helper import (
S3_TEST_REPORTS_BUCKET,
S3_BUILDS_BUCKET,
RUNNER_TEMP,
CI,
S3_URL,
S3_DOWNLOAD,
)
2021-09-10 14:27:03 +00:00
from compress_files import compress_file_fast
2021-11-26 14:00:09 +00:00
2021-09-10 14:27:03 +00:00
def _md5(fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
logging.debug("MD5 for %s is %s", fname, hash_md5.hexdigest())
2021-09-10 14:27:03 +00:00
return hash_md5.hexdigest()
def _flatten_list(lst):
result = []
for elem in lst:
if isinstance(elem, list):
result += _flatten_list(elem)
else:
result.append(elem)
return result
2021-11-26 14:00:09 +00:00
class S3Helper:
2023-05-09 15:57:23 +00:00
max_pool_size = 100
def __init__(self):
2023-05-09 15:57:23 +00:00
config = botocore.config.Config(max_pool_connections=self.max_pool_size)
self.session = boto3.session.Session(region_name="us-east-1")
2023-05-09 15:57:23 +00:00
self.client = self.session.client("s3", endpoint_url=S3_URL, config=config)
self.host = S3_URL
self.download_host = S3_DOWNLOAD
2021-09-10 14:27:03 +00:00
def _upload_file_to_s3(self, bucket_name: str, file_path: str, s3_path: str) -> str:
logging.debug(
"Start uploading %s to bucket=%s path=%s", file_path, bucket_name, s3_path
)
2021-09-10 14:27:03 +00:00
metadata = {}
if os.path.getsize(file_path) < 64 * 1024 * 1024:
if (
s3_path.endswith("txt")
or s3_path.endswith("log")
or s3_path.endswith("err")
or s3_path.endswith("out")
):
metadata["ContentType"] = "text/plain; charset=utf-8"
logging.info(
"Content type %s for file path %s",
"text/plain; charset=utf-8",
file_path,
)
2021-09-10 14:27:03 +00:00
elif s3_path.endswith("html"):
metadata["ContentType"] = "text/html; charset=utf-8"
logging.info(
"Content type %s for file path %s",
"text/html; charset=utf-8",
file_path,
)
2021-11-08 16:25:00 +00:00
elif s3_path.endswith("css"):
metadata["ContentType"] = "text/css; charset=utf-8"
logging.info(
"Content type %s for file path %s",
"text/css; charset=utf-8",
file_path,
)
2021-11-08 16:25:00 +00:00
elif s3_path.endswith("js"):
metadata["ContentType"] = "text/javascript; charset=utf-8"
logging.info(
"Content type %s for file path %s",
"text/css; charset=utf-8",
file_path,
)
2021-09-10 14:27:03 +00:00
else:
logging.info("No content type provied for %s", file_path)
else:
if re.search(r"\.(txt|log|err|out)$", s3_path) or re.search(
2023-01-01 20:17:43 +00:00
r"\.log\..*(?<!\.zst)$", s3_path
):
logging.info(
"Going to compress file log file %s to %s",
file_path,
2023-01-01 20:17:43 +00:00
file_path + ".zst",
)
2023-08-10 20:41:41 +00:00
# FIXME: rewrite S3 to Path
_file_path = Path(file_path)
compress_file_fast(
_file_path, _file_path.with_suffix(_file_path.suffix + ".zst")
)
2023-01-01 20:17:43 +00:00
file_path += ".zst"
s3_path += ".zst"
2021-09-10 14:27:03 +00:00
else:
logging.info("Processing file without compression")
logging.info("File is too large, do not provide content type")
self.client.upload_file(file_path, bucket_name, s3_path, ExtraArgs=metadata)
logging.info("Upload %s to %s. Meta: %s", file_path, s3_path, metadata)
2021-11-26 14:00:09 +00:00
# last two replacements are specifics of AWS urls:
# https://jamesd3142.wordpress.com/2018/02/28/amazon-s3-and-the-plus-symbol/
2022-08-11 13:01:32 +00:00
url = f"{self.download_host}/{bucket_name}/{s3_path}"
2022-08-10 13:22:04 +00:00
return url.replace("+", "%2B").replace(" ", "%20")
2021-09-10 14:27:03 +00:00
def upload_test_report_to_s3(self, file_path: str, s3_path: str) -> str:
2021-11-26 14:00:09 +00:00
if CI:
return self._upload_file_to_s3(S3_TEST_REPORTS_BUCKET, file_path, s3_path)
else:
return S3Helper.copy_file_to_local(
S3_TEST_REPORTS_BUCKET, file_path, s3_path
)
2021-09-10 14:27:03 +00:00
def upload_build_file_to_s3(self, file_path, s3_path):
2021-11-26 14:00:09 +00:00
if CI:
return self._upload_file_to_s3(S3_BUILDS_BUCKET, file_path, s3_path)
else:
return S3Helper.copy_file_to_local(S3_BUILDS_BUCKET, file_path, s3_path)
2021-09-10 14:27:03 +00:00
def fast_parallel_upload_dir(
self, dir_path: Union[str, Path], s3_dir_path: str, bucket_name: str
) -> List[str]:
2021-12-20 19:43:37 +00:00
all_files = []
for root, _, files in os.walk(dir_path):
for file in files:
all_files.append(os.path.join(root, file))
logging.info("Files found %s", len(all_files))
counter = 0
t = time.time()
sum_time = 0
def upload_task(file_path: str) -> str:
2021-12-20 19:43:37 +00:00
nonlocal counter
nonlocal t
nonlocal sum_time
try:
s3_path = file_path.replace(str(dir_path), s3_dir_path)
2021-12-21 07:49:22 +00:00
metadata = {}
if s3_path.endswith("html"):
metadata["ContentType"] = "text/html; charset=utf-8"
2021-12-21 07:49:22 +00:00
elif s3_path.endswith("css"):
metadata["ContentType"] = "text/css; charset=utf-8"
2021-12-21 07:49:22 +00:00
elif s3_path.endswith("js"):
metadata["ContentType"] = "text/javascript; charset=utf-8"
2021-12-21 07:49:22 +00:00
2021-12-20 19:43:37 +00:00
# Retry
for i in range(5):
try:
self.client.upload_file(
file_path, bucket_name, s3_path, ExtraArgs=metadata
)
2021-12-20 19:43:37 +00:00
break
except Exception as ex:
if i == 4:
raise ex
time.sleep(0.1 * i)
counter += 1
if counter % 1000 == 0:
sum_time += int(time.time() - t)
print(
2023-05-09 15:57:23 +00:00
f"Uploaded {counter}, {int(time.time()-t)}s, "
f"sum time {sum_time}s",
)
2021-12-20 19:43:37 +00:00
t = time.time()
except Exception as ex:
logging.critical("Failed to upload file, expcetion %s", ex)
2022-08-11 13:01:32 +00:00
return f"{self.download_host}/{bucket_name}/{s3_path}"
2021-12-20 19:43:37 +00:00
2023-05-09 15:57:23 +00:00
p = Pool(self.max_pool_size)
2021-12-20 19:43:37 +00:00
2023-05-09 15:57:23 +00:00
original_level = logging.root.level
2021-12-20 19:43:37 +00:00
logging.basicConfig(level=logging.CRITICAL)
result = sorted(_flatten_list(p.map(upload_task, all_files)))
2023-05-09 15:57:23 +00:00
logging.basicConfig(level=original_level)
2021-12-20 19:43:37 +00:00
return result
def _upload_folder_to_s3(
self,
folder_path,
s3_folder_path,
bucket_name,
keep_dirs_in_s3_path,
upload_symlinks,
):
logging.info(
"Upload folder '%s' to bucket=%s of s3 folder '%s'",
folder_path,
bucket_name,
s3_folder_path,
)
2021-09-10 14:27:03 +00:00
if not os.path.exists(folder_path):
return []
files = os.listdir(folder_path)
if not files:
return []
2021-09-16 15:51:43 +00:00
p = Pool(min(len(files), 5))
2021-09-10 14:27:03 +00:00
def task(file_name):
full_fs_path = os.path.join(folder_path, file_name)
if keep_dirs_in_s3_path:
full_s3_path = s3_folder_path + "/" + os.path.basename(folder_path)
else:
full_s3_path = s3_folder_path
if os.path.isdir(full_fs_path):
return self._upload_folder_to_s3(
full_fs_path,
full_s3_path,
bucket_name,
keep_dirs_in_s3_path,
upload_symlinks,
)
2021-09-10 14:27:03 +00:00
if os.path.islink(full_fs_path):
if upload_symlinks:
2021-11-26 14:00:09 +00:00
if CI:
return self._upload_file_to_s3(
bucket_name, full_fs_path, full_s3_path + "/" + file_name
)
2021-11-26 14:00:09 +00:00
else:
return S3Helper.copy_file_to_local(
bucket_name, full_fs_path, full_s3_path + "/" + file_name
)
2021-09-10 14:27:03 +00:00
return []
2021-11-26 14:00:09 +00:00
if CI:
return self._upload_file_to_s3(
bucket_name, full_fs_path, full_s3_path + "/" + file_name
)
2021-11-26 14:00:09 +00:00
else:
return S3Helper.copy_file_to_local(
bucket_name, full_fs_path, full_s3_path + "/" + file_name
)
2021-09-10 14:27:03 +00:00
return sorted(_flatten_list(list(p.map(task, files))))
def upload_build_folder_to_s3(
self,
folder_path,
s3_folder_path,
keep_dirs_in_s3_path=True,
upload_symlinks=True,
):
return self._upload_folder_to_s3(
folder_path,
s3_folder_path,
S3_BUILDS_BUCKET,
keep_dirs_in_s3_path,
upload_symlinks,
)
def upload_test_folder_to_s3(
self,
folder_path,
s3_folder_path,
keep_dirs_in_s3_path=True,
upload_symlinks=True,
):
return self._upload_folder_to_s3(
folder_path,
s3_folder_path,
S3_TEST_REPORTS_BUCKET,
keep_dirs_in_s3_path,
upload_symlinks,
)
2021-11-10 09:08:43 +00:00
2021-11-26 14:00:09 +00:00
def list_prefix(self, s3_prefix_path, bucket=S3_BUILDS_BUCKET):
2021-11-10 09:08:43 +00:00
objects = self.client.list_objects_v2(Bucket=bucket, Prefix=s3_prefix_path)
result = []
if "Contents" in objects:
for obj in objects["Contents"]:
result.append(obj["Key"])
2021-11-10 09:08:43 +00:00
return result
2021-11-26 14:00:09 +00:00
2022-08-11 13:01:32 +00:00
def exists(self, key, bucket=S3_BUILDS_BUCKET):
try:
self.client.head_object(Bucket=bucket, Key=key)
return True
except Exception:
return False
2021-11-26 14:00:09 +00:00
@staticmethod
def copy_file_to_local(bucket_name: str, file_path: str, s3_path: str) -> str:
local_path = os.path.abspath(
os.path.join(RUNNER_TEMP, "s3", bucket_name, s3_path)
)
2021-11-26 14:00:09 +00:00
local_dir = os.path.dirname(local_path)
if not os.path.exists(local_dir):
os.makedirs(local_dir)
shutil.copy(file_path, local_path)
logging.info("Copied %s to %s", file_path, local_path)
return f"file://{local_path}"