From 7ca8ea9d23ccc3ec7cae89a6696dfb71a4e72c0c Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 13 Jan 2024 23:53:01 +0100 Subject: [PATCH] Simplify stateful tests --- docker/test/stateful/Dockerfile | 5 - docker/test/stateful/run.sh | 348 +++++++++++++++++++++++++++++- docker/test/stateful/s3downloader | 126 ----------- utils/s3tools/s3uploader | 222 ------------------- 4 files changed, 337 insertions(+), 364 deletions(-) delete mode 100755 docker/test/stateful/s3downloader delete mode 100755 utils/s3tools/s3uploader diff --git a/docker/test/stateful/Dockerfile b/docker/test/stateful/Dockerfile index f513735a2d0..cc6e13a632d 100644 --- a/docker/test/stateful/Dockerfile +++ b/docker/test/stateful/Dockerfile @@ -11,11 +11,6 @@ RUN apt-get update -y \ npm \ && apt-get clean -COPY s3downloader /s3downloader - -ENV S3_URL="https://clickhouse-datasets.s3.amazonaws.com" -ENV DATASETS="hits visits" - # The following is already done in clickhouse/stateless-test # RUN npm install -g azurite # RUN npm install tslib diff --git a/docker/test/stateful/run.sh b/docker/test/stateful/run.sh index 9079246429f..503ee2c301c 100755 --- a/docker/test/stateful/run.sh +++ b/docker/test/stateful/run.sh @@ -97,20 +97,346 @@ start setup_logs_replication -# shellcheck disable=SC2086 # No quotes because I want to split it into words. -/s3downloader --url-prefix "$S3_URL" --dataset-names $DATASETS -chmod 777 -R /var/lib/clickhouse clickhouse-client --query "SHOW DATABASES" +clickhouse-client --query "CREATE DATABASE datasets" -clickhouse-client --query "ATTACH DATABASE datasets ENGINE = Ordinary" +clickhouse-client --query " +ATTACH TABLE datasets.hits_v1 UUID '78ebf6a1-d987-4579-b3ec-00c1a087b1f3' +( + WatchID UInt64, + JavaEnable UInt8, + Title String, + GoodEvent Int16, + EventTime DateTime, + EventDate Date, + CounterID UInt32, + ClientIP UInt32, + ClientIP6 FixedString(16), + RegionID UInt32, + UserID UInt64, + CounterClass Int8, + OS UInt8, + UserAgent UInt8, + URL String, + Referer String, + URLDomain String, + RefererDomain String, + Refresh UInt8, + IsRobot UInt8, + RefererCategories Array(UInt16), + URLCategories Array(UInt16), + URLRegions Array(UInt32), + RefererRegions Array(UInt32), + ResolutionWidth UInt16, + ResolutionHeight UInt16, + ResolutionDepth UInt8, + FlashMajor UInt8, + FlashMinor UInt8, + FlashMinor2 String, + NetMajor UInt8, + NetMinor UInt8, + UserAgentMajor UInt16, + UserAgentMinor FixedString(2), + CookieEnable UInt8, + JavascriptEnable UInt8, + IsMobile UInt8, + MobilePhone UInt8, + MobilePhoneModel String, + Params String, + IPNetworkID UInt32, + TraficSourceID Int8, + SearchEngineID UInt16, + SearchPhrase String, + AdvEngineID UInt8, + IsArtifical UInt8, + WindowClientWidth UInt16, + WindowClientHeight UInt16, + ClientTimeZone Int16, + ClientEventTime DateTime, + SilverlightVersion1 UInt8, + SilverlightVersion2 UInt8, + SilverlightVersion3 UInt32, + SilverlightVersion4 UInt16, + PageCharset String, + CodeVersion UInt32, + IsLink UInt8, + IsDownload UInt8, + IsNotBounce UInt8, + FUniqID UInt64, + HID UInt32, + IsOldCounter UInt8, + IsEvent UInt8, + IsParameter UInt8, + DontCountHits UInt8, + WithHash UInt8, + HitColor FixedString(1), + UTCEventTime DateTime, + Age UInt8, + Sex UInt8, + Income UInt8, + Interests UInt16, + Robotness UInt8, + GeneralInterests Array(UInt16), + RemoteIP UInt32, + RemoteIP6 FixedString(16), + WindowName Int32, + OpenerName Int32, + HistoryLength Int16, + BrowserLanguage FixedString(2), + BrowserCountry FixedString(2), + SocialNetwork String, + SocialAction String, + HTTPError UInt16, + SendTiming Int32, + DNSTiming Int32, + ConnectTiming Int32, + ResponseStartTiming Int32, + ResponseEndTiming Int32, + FetchTiming Int32, + RedirectTiming Int32, + DOMInteractiveTiming Int32, + DOMContentLoadedTiming Int32, + DOMCompleteTiming Int32, + LoadEventStartTiming Int32, + LoadEventEndTiming Int32, + NSToDOMContentLoadedTiming Int32, + FirstPaintTiming Int32, + RedirectCount Int8, + SocialSourceNetworkID UInt8, + SocialSourcePage String, + ParamPrice Int64, + ParamOrderID String, + ParamCurrency FixedString(3), + ParamCurrencyID UInt16, + GoalsReached Array(UInt32), + OpenstatServiceName String, + OpenstatCampaignID String, + OpenstatAdID String, + OpenstatSourceID String, + UTMSource String, + UTMMedium String, + UTMCampaign String, + UTMContent String, + UTMTerm String, + FromTag String, + HasGCLID UInt8, + RefererHash UInt64, + URLHash UInt64, + CLID UInt32, + YCLID UInt64, + ShareService String, + ShareURL String, + ShareTitle String, + ParsedParams.Key1 Array(String), + ParsedParams.Key2 Array(String), + ParsedParams.Key3 Array(String), + ParsedParams.Key4 Array(String), + ParsedParams.Key5 Array(String), + ParsedParams.ValueDouble Array(Float64), + IslandID FixedString(16), + RequestNum UInt32, + RequestTry UInt8 +) +ENGINE = MergeTree +PARTITION BY toYYYYMM(EventDate) +ORDER BY (CounterID, EventDate, intHash32(UserID)) +SAMPLE BY intHash32(UserID) +SETTINGS disk = disk(type = cache, path = '/dev/shm/clickhouse/', max_size = '16G', + disk = disk(type = web, endpoint = 'https://clickhouse-datasets-web.s3.us-east-1.amazonaws.com/')); +" -service clickhouse-server restart - -# Wait for server to start accepting connections -for _ in {1..120}; do - clickhouse-client --query "SELECT 1" && break - sleep 1 -done +clickhouse-client --query " +ATTACH TABLE datasets.visits_v1 UUID '5131f834-711f-4168-98a5-968b691a104b' +( + CounterID UInt32, + StartDate Date, + Sign Int8, + IsNew UInt8, + VisitID UInt64, + UserID UInt64, + StartTime DateTime, + Duration UInt32, + UTCStartTime DateTime, + PageViews Int32, + Hits Int32, + IsBounce UInt8, + Referer String, + StartURL String, + RefererDomain String, + StartURLDomain String, + EndURL String, + LinkURL String, + IsDownload UInt8, + TraficSourceID Int8, + SearchEngineID UInt16, + SearchPhrase String, + AdvEngineID UInt8, + PlaceID Int32, + RefererCategories Array(UInt16), + URLCategories Array(UInt16), + URLRegions Array(UInt32), + RefererRegions Array(UInt32), + IsYandex UInt8, + GoalReachesDepth Int32, + GoalReachesURL Int32, + GoalReachesAny Int32, + SocialSourceNetworkID UInt8, + SocialSourcePage String, + MobilePhoneModel String, + ClientEventTime DateTime, + RegionID UInt32, + ClientIP UInt32, + ClientIP6 FixedString(16), + RemoteIP UInt32, + RemoteIP6 FixedString(16), + IPNetworkID UInt32, + SilverlightVersion3 UInt32, + CodeVersion UInt32, + ResolutionWidth UInt16, + ResolutionHeight UInt16, + UserAgentMajor UInt16, + UserAgentMinor UInt16, + WindowClientWidth UInt16, + WindowClientHeight UInt16, + SilverlightVersion2 UInt8, + SilverlightVersion4 UInt16, + FlashVersion3 UInt16, + FlashVersion4 UInt16, + ClientTimeZone Int16, + OS UInt8, + UserAgent UInt8, + ResolutionDepth UInt8, + FlashMajor UInt8, + FlashMinor UInt8, + NetMajor UInt8, + NetMinor UInt8, + MobilePhone UInt8, + SilverlightVersion1 UInt8, + Age UInt8, + Sex UInt8, + Income UInt8, + JavaEnable UInt8, + CookieEnable UInt8, + JavascriptEnable UInt8, + IsMobile UInt8, + BrowserLanguage UInt16, + BrowserCountry UInt16, + Interests UInt16, + Robotness UInt8, + GeneralInterests Array(UInt16), + Params Array(String), + Goals.ID Array(UInt32), + Goals.Serial Array(UInt32), + Goals.EventTime Array(DateTime), + Goals.Price Array(Int64), + Goals.OrderID Array(String), + Goals.CurrencyID Array(UInt32), + WatchIDs Array(UInt64), + ParamSumPrice Int64, + ParamCurrency FixedString(3), + ParamCurrencyID UInt16, + ClickLogID UInt64, + ClickEventID Int32, + ClickGoodEvent Int32, + ClickEventTime DateTime, + ClickPriorityID Int32, + ClickPhraseID Int32, + ClickPageID Int32, + ClickPlaceID Int32, + ClickTypeID Int32, + ClickResourceID Int32, + ClickCost UInt32, + ClickClientIP UInt32, + ClickDomainID UInt32, + ClickURL String, + ClickAttempt UInt8, + ClickOrderID UInt32, + ClickBannerID UInt32, + ClickMarketCategoryID UInt32, + ClickMarketPP UInt32, + ClickMarketCategoryName String, + ClickMarketPPName String, + ClickAWAPSCampaignName String, + ClickPageName String, + ClickTargetType UInt16, + ClickTargetPhraseID UInt64, + ClickContextType UInt8, + ClickSelectType Int8, + ClickOptions String, + ClickGroupBannerID Int32, + OpenstatServiceName String, + OpenstatCampaignID String, + OpenstatAdID String, + OpenstatSourceID String, + UTMSource String, + UTMMedium String, + UTMCampaign String, + UTMContent String, + UTMTerm String, + FromTag String, + HasGCLID UInt8, + FirstVisit DateTime, + PredLastVisit Date, + LastVisit Date, + TotalVisits UInt32, + TraficSource.ID Array(Int8), + TraficSource.SearchEngineID Array(UInt16), + TraficSource.AdvEngineID Array(UInt8), + TraficSource.PlaceID Array(UInt16), + TraficSource.SocialSourceNetworkID Array(UInt8), + TraficSource.Domain Array(String), + TraficSource.SearchPhrase Array(String), + TraficSource.SocialSourcePage Array(String), + Attendance FixedString(16), + CLID UInt32, + YCLID UInt64, + NormalizedRefererHash UInt64, + SearchPhraseHash UInt64, + RefererDomainHash UInt64, + NormalizedStartURLHash UInt64, + StartURLDomainHash UInt64, + NormalizedEndURLHash UInt64, + TopLevelDomain UInt64, + URLScheme UInt64, + OpenstatServiceNameHash UInt64, + OpenstatCampaignIDHash UInt64, + OpenstatAdIDHash UInt64, + OpenstatSourceIDHash UInt64, + UTMSourceHash UInt64, + UTMMediumHash UInt64, + UTMCampaignHash UInt64, + UTMContentHash UInt64, + UTMTermHash UInt64, + FromHash UInt64, + WebVisorEnabled UInt8, + WebVisorActivity UInt32, + ParsedParams.Key1 Array(String), + ParsedParams.Key2 Array(String), + ParsedParams.Key3 Array(String), + ParsedParams.Key4 Array(String), + ParsedParams.Key5 Array(String), + ParsedParams.ValueDouble Array(Float64), + Market.Type Array(UInt8), + Market.GoalID Array(UInt32), + Market.OrderID Array(String), + Market.OrderPrice Array(Int64), + Market.PP Array(UInt32), + Market.DirectPlaceID Array(UInt32), + Market.DirectOrderID Array(UInt32), + Market.DirectBannerID Array(UInt32), + Market.GoodID Array(String), + Market.GoodName Array(String), + Market.GoodQuantity Array(Int32), + Market.GoodPrice Array(Int64), + IslandID FixedString(16) +) +ENGINE = CollapsingMergeTree(Sign) +PARTITION BY toYYYYMM(StartDate) +ORDER BY (CounterID, StartDate, intHash32(UserID), VisitID) +SAMPLE BY intHash32(UserID) +SETTINGS disk = disk(type = cache, path = '/dev/shm/clickhouse/', max_size = '16G', + disk = disk(type = web, endpoint = 'https://clickhouse-datasets-web.s3.us-east-1.amazonaws.com/')); +" clickhouse-client --query "SHOW TABLES FROM datasets" diff --git a/docker/test/stateful/s3downloader b/docker/test/stateful/s3downloader deleted file mode 100755 index 77601fb5af6..00000000000 --- a/docker/test/stateful/s3downloader +++ /dev/null @@ -1,126 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -import os -import sys -import time -import tarfile -import logging -import argparse -import requests -import tempfile - - -DEFAULT_URL = "https://clickhouse-datasets.s3.amazonaws.com" - -AVAILABLE_DATASETS = { - "hits": "hits_v1.tar", - "visits": "visits_v1.tar", -} - -RETRIES_COUNT = 5 - - -def _get_temp_file_name(): - return os.path.join( - tempfile._get_default_tempdir(), next(tempfile._get_candidate_names()) - ) - - -def build_url(base_url, dataset): - return os.path.join(base_url, dataset, "partitions", AVAILABLE_DATASETS[dataset]) - - -def download_with_progress(url, path): - logging.info("Downloading from %s to temp path %s", url, path) - for i in range(RETRIES_COUNT): - try: - with open(path, "wb") as f: - response = requests.get(url, stream=True) - response.raise_for_status() - total_length = response.headers.get("content-length") - if total_length is None or int(total_length) == 0: - logging.info( - "No content-length, will download file without progress" - ) - f.write(response.content) - else: - dl = 0 - total_length = int(total_length) - logging.info("Content length is %ld bytes", total_length) - for data in response.iter_content(chunk_size=4096): - dl += len(data) - f.write(data) - if sys.stdout.isatty(): - done = int(50 * dl / total_length) - percent = int(100 * float(dl) / total_length) - sys.stdout.write( - "\r[{}{}] {}%".format( - "=" * done, " " * (50 - done), percent - ) - ) - sys.stdout.flush() - break - except Exception as ex: - sys.stdout.write("\n") - time.sleep(3) - logging.info("Exception while downloading %s, retry %s", ex, i + 1) - if os.path.exists(path): - os.remove(path) - else: - raise Exception( - "Cannot download dataset from {}, all retries exceeded".format(url) - ) - - sys.stdout.write("\n") - logging.info("Downloading finished") - - -def unpack_to_clickhouse_directory(tar_path, clickhouse_path): - logging.info( - "Will unpack data from temp path %s to clickhouse db %s", - tar_path, - clickhouse_path, - ) - with tarfile.open(tar_path, "r") as comp_file: - comp_file.extractall(path=clickhouse_path) - logging.info("Unpack finished") - - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - - parser = argparse.ArgumentParser( - description="Simple tool for dowloading datasets for clickhouse from S3" - ) - - parser.add_argument( - "--dataset-names", - required=True, - nargs="+", - choices=list(AVAILABLE_DATASETS.keys()), - ) - parser.add_argument("--url-prefix", default=DEFAULT_URL) - parser.add_argument("--clickhouse-data-path", default="/var/lib/clickhouse/") - - args = parser.parse_args() - datasets = args.dataset_names - logging.info("Will fetch following datasets: %s", ", ".join(datasets)) - for dataset in datasets: - logging.info("Processing %s", dataset) - temp_archive_path = _get_temp_file_name() - try: - download_url_for_dataset = build_url(args.url_prefix, dataset) - download_with_progress(download_url_for_dataset, temp_archive_path) - unpack_to_clickhouse_directory(temp_archive_path, args.clickhouse_data_path) - except Exception as ex: - logging.info("Some exception occured %s", str(ex)) - raise - finally: - logging.info( - "Will remove downloaded file %s from filesystem if it exists", - temp_archive_path, - ) - if os.path.exists(temp_archive_path): - os.remove(temp_archive_path) - logging.info("Processing of %s finished", dataset) - logging.info("Fetch finished, enjoy your tables!") diff --git a/utils/s3tools/s3uploader b/utils/s3tools/s3uploader deleted file mode 100755 index d53661614c0..00000000000 --- a/utils/s3tools/s3uploader +++ /dev/null @@ -1,222 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -import os -import logging -import argparse -import tarfile -import math - -try: - from boto.s3.connection import S3Connection - from boto.s3.key import Key -except ImportError: - raise ImportError("You have to install boto package 'pip install boto'") - - -class S3API(object): - def __init__(self, access_key, secret_access_key, mds_api, mds_url): - self.connection = S3Connection( - host=mds_api, - aws_access_key_id=access_key, - aws_secret_access_key=secret_access_key, - ) - self.mds_url = mds_url - - def upload_file(self, bucket_name, file_path, s3_path): - logging.info("Start uploading file to bucket %s", bucket_name) - - bucket = self.connection.get_bucket(bucket_name) - key = bucket.initiate_multipart_upload(s3_path) - logging.info("Will upload to s3 path %s", s3_path) - chunksize = 1024 * 1024 * 1024 # 1 GB - filesize = os.stat(file_path).st_size - logging.info("File size is %s", filesize) - chunkcount = int(math.ceil(filesize / chunksize)) - - def call_back(x, y): - print("Uploaded {}/{} bytes".format(x, y)) - - try: - for i in range(chunkcount + 1): - logging.info("Uploading chunk %s of %s", i, chunkcount + 1) - offset = chunksize * i - bytes_size = min(chunksize, filesize - offset) - - with open(file_path, "r") as fp: - fp.seek(offset) - key.upload_part_from_file( - fp=fp, part_num=i + 1, size=bytes_size, cb=call_back, num_cb=100 - ) - key.complete_upload() - except Exception as ex: - key.cancel_upload() - raise ex - logging.info("Contents were set") - return "https://{bucket}.{mds_url}/{path}".format( - bucket=bucket_name, mds_url=self.mds_url, path=s3_path - ) - - def set_file_contents(self, bucket, local_file_path, s3_file_path): - key = Key(bucket) - key.key = s3_file_path - file_size = os.stat(local_file_path).st_size - logging.info( - "Uploading file `%s` to `%s`. Size is %s", - local_file_path, - s3_file_path, - file_size, - ) - - def call_back(x, y): - print("Uploaded {}/{} bytes".format(x, y)) - - key.set_contents_from_filename(local_file_path, cb=call_back) - - def upload_data_for_static_files_disk(self, bucket_name, directory_path, s3_path): - bucket = self.connection.get_bucket(bucket_name) - if s3_path.endswith("/"): - s3_path += "store/" - else: - s3_path += "/store/" - print(s3_path) - for root, dirs, files in os.walk(directory_path): - path = root.split(os.sep) - for file in files: - local_file_path = os.path.join(root, file) - s3_file = local_file_path[len(directory_path) + 1 :] - s3_file_path = os.path.join(s3_path, s3_file) - self.set_file_contents(bucket, local_file_path, s3_file_path) - - logging.info("Uploading finished") - return "https://{bucket}.{mds_url}/{path}".format( - bucket=bucket_name, mds_url=self.mds_url, path=s3_path - ) - - def list_bucket_keys(self, bucket_name): - bucket = self.connection.get_bucket(bucket_name) - for obj in bucket.get_all_keys(): - print(obj.key) - - def remove_folder_from_bucket(self, bucket_name, folder_path): - bucket = self.connection.get_bucket(bucket_name) - bucket.get_all_keys() - for obj in bucket.get_all_keys(): - if obj.key.startswith(folder_path): - print("Removing " + obj.key) - obj.delete() - - -def make_tar_file_for_table(clickhouse_data_path, db_name, table_name, tmp_prefix): - relative_data_path = os.path.join("data", db_name, table_name) - relative_meta_path = os.path.join("metadata", db_name, table_name + ".sql") - path_to_data = os.path.join(clickhouse_data_path, relative_data_path) - path_to_metadata = os.path.join(clickhouse_data_path, relative_meta_path) - temporary_file_name = tmp_prefix + "/{tname}.tar".format(tname=table_name) - with tarfile.open(temporary_file_name, "w") as bundle: - bundle.add(path_to_data, arcname=relative_data_path) - bundle.add(path_to_metadata, arcname=relative_meta_path) - return temporary_file_name - - -USAGE_EXAMPLES = """ -examples: -\t./s3uploader --dataset-name some_ds --access-key-id XXX --secret-access-key YYY --clickhouse-data-path /opt/clickhouse/ --table-name default.some_tbl --bucket-name some-bucket -\t./s3uploader --dataset-name some_ds --access-key-id XXX --secret-access-key YYY --file-path some_ds.tsv.xz --bucket-name some-bucket --s3-path /path/to/ -""" - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s") - - parser = argparse.ArgumentParser( - description="Simple tool for uploading datasets to clickhouse S3", - usage="%(prog)s [options] {}".format(USAGE_EXAMPLES), - ) - parser.add_argument("--s3-api-url", default="s3.amazonaws.com") - parser.add_argument("--s3-common-url", default="s3.amazonaws.com") - parser.add_argument("--bucket-name", default="clickhouse-datasets") - parser.add_argument( - "--dataset-name", - required=True, - help="Name of dataset, will be used in uploaded path", - ) - parser.add_argument("--access-key-id", required=True) - parser.add_argument("--secret-access-key", required=True) - parser.add_argument( - "--clickhouse-data-path", - default="/var/lib/clickhouse/", - help="Path to clickhouse database on filesystem", - ) - parser.add_argument("--s3-path", help="Path in s3, where to upload file") - parser.add_argument( - "--tmp-prefix", default="/tmp", help="Prefix to store temporary downloaded file" - ) - data_group = parser.add_mutually_exclusive_group(required=True) - table_name_argument = data_group.add_argument( - "--table-name", - help="Name of table with database, if you are uploading partitions", - ) - data_group.add_argument("--file-path", help="Name of file, if you are uploading") - data_group.add_argument( - "--directory-path", help="Path to directory with files to upload" - ) - data_group.add_argument( - "--list-directory", help="List s3 directory by --directory-path" - ) - data_group.add_argument( - "--remove-directory", help="Remove s3 directory by --directory-path" - ) - args = parser.parse_args() - - if args.table_name is not None and args.clickhouse_data_path is None: - raise argparse.ArgumentError( - table_name_argument, - "You should specify --clickhouse-data-path to upload --table", - ) - - s3_conn = S3API( - args.access_key_id, args.secret_access_key, args.s3_api_url, args.s3_common_url - ) - - file_path = "" - directory_path = args.directory_path - s3_path = args.s3_path - - if args.list_directory: - s3_conn.list_bucket_keys(args.bucket_name) - elif args.remove_directory: - print("Removing s3 path: " + args.remove_directory) - s3_conn.remove_folder_from_bucket(args.bucket_name, args.remove_directory) - elif args.directory_path is not None: - url = s3_conn.upload_data_for_static_files_disk( - args.bucket_name, directory_path, s3_path - ) - logging.info("Data uploaded: %s", url) - else: - if args.table_name is not None: - if "." not in args.table_name: - db_name = "default" - else: - db_name, table_name = args.table_name.split(".") - file_path = make_tar_file_for_table( - args.clickhouse_data_path, db_name, table_name, args.tmp_prefix - ) - else: - file_path = args.file_path - - if "tsv" in file_path: - s3_path = os.path.join( - args.dataset_name, "tsv", os.path.basename(file_path) - ) - if args.table_name is not None: - s3_path = os.path.join( - args.dataset_name, "partitions", os.path.basename(file_path) - ) - elif args.s3_path is not None: - s3_path = os.path.join( - args.dataset_name, args.s3_path, os.path.basename(file_path) - ) - else: - raise Exception("Don't know s3-path to upload") - - url = s3_conn.upload_file(args.bucket_name, file_path, s3_path) - logging.info("Data uploaded: %s", url)