Simplify stateful tests

This commit is contained in:
Alexey Milovidov 2024-01-13 23:53:01 +01:00
parent d46ee32e06
commit 7ca8ea9d23
4 changed files with 337 additions and 364 deletions

View File

@ -11,11 +11,6 @@ RUN apt-get update -y \
npm \
&& apt-get clean
COPY s3downloader /s3downloader
ENV S3_URL="https://clickhouse-datasets.s3.amazonaws.com"
ENV DATASETS="hits visits"
# The following is already done in clickhouse/stateless-test
# RUN npm install -g azurite
# RUN npm install tslib

View File

@ -97,20 +97,346 @@ start
setup_logs_replication
# shellcheck disable=SC2086 # No quotes because I want to split it into words.
/s3downloader --url-prefix "$S3_URL" --dataset-names $DATASETS
chmod 777 -R /var/lib/clickhouse
clickhouse-client --query "SHOW DATABASES"
clickhouse-client --query "CREATE DATABASE datasets"
clickhouse-client --query "ATTACH DATABASE datasets ENGINE = Ordinary"
clickhouse-client --query "
ATTACH TABLE datasets.hits_v1 UUID '78ebf6a1-d987-4579-b3ec-00c1a087b1f3'
(
WatchID UInt64,
JavaEnable UInt8,
Title String,
GoodEvent Int16,
EventTime DateTime,
EventDate Date,
CounterID UInt32,
ClientIP UInt32,
ClientIP6 FixedString(16),
RegionID UInt32,
UserID UInt64,
CounterClass Int8,
OS UInt8,
UserAgent UInt8,
URL String,
Referer String,
URLDomain String,
RefererDomain String,
Refresh UInt8,
IsRobot UInt8,
RefererCategories Array(UInt16),
URLCategories Array(UInt16),
URLRegions Array(UInt32),
RefererRegions Array(UInt32),
ResolutionWidth UInt16,
ResolutionHeight UInt16,
ResolutionDepth UInt8,
FlashMajor UInt8,
FlashMinor UInt8,
FlashMinor2 String,
NetMajor UInt8,
NetMinor UInt8,
UserAgentMajor UInt16,
UserAgentMinor FixedString(2),
CookieEnable UInt8,
JavascriptEnable UInt8,
IsMobile UInt8,
MobilePhone UInt8,
MobilePhoneModel String,
Params String,
IPNetworkID UInt32,
TraficSourceID Int8,
SearchEngineID UInt16,
SearchPhrase String,
AdvEngineID UInt8,
IsArtifical UInt8,
WindowClientWidth UInt16,
WindowClientHeight UInt16,
ClientTimeZone Int16,
ClientEventTime DateTime,
SilverlightVersion1 UInt8,
SilverlightVersion2 UInt8,
SilverlightVersion3 UInt32,
SilverlightVersion4 UInt16,
PageCharset String,
CodeVersion UInt32,
IsLink UInt8,
IsDownload UInt8,
IsNotBounce UInt8,
FUniqID UInt64,
HID UInt32,
IsOldCounter UInt8,
IsEvent UInt8,
IsParameter UInt8,
DontCountHits UInt8,
WithHash UInt8,
HitColor FixedString(1),
UTCEventTime DateTime,
Age UInt8,
Sex UInt8,
Income UInt8,
Interests UInt16,
Robotness UInt8,
GeneralInterests Array(UInt16),
RemoteIP UInt32,
RemoteIP6 FixedString(16),
WindowName Int32,
OpenerName Int32,
HistoryLength Int16,
BrowserLanguage FixedString(2),
BrowserCountry FixedString(2),
SocialNetwork String,
SocialAction String,
HTTPError UInt16,
SendTiming Int32,
DNSTiming Int32,
ConnectTiming Int32,
ResponseStartTiming Int32,
ResponseEndTiming Int32,
FetchTiming Int32,
RedirectTiming Int32,
DOMInteractiveTiming Int32,
DOMContentLoadedTiming Int32,
DOMCompleteTiming Int32,
LoadEventStartTiming Int32,
LoadEventEndTiming Int32,
NSToDOMContentLoadedTiming Int32,
FirstPaintTiming Int32,
RedirectCount Int8,
SocialSourceNetworkID UInt8,
SocialSourcePage String,
ParamPrice Int64,
ParamOrderID String,
ParamCurrency FixedString(3),
ParamCurrencyID UInt16,
GoalsReached Array(UInt32),
OpenstatServiceName String,
OpenstatCampaignID String,
OpenstatAdID String,
OpenstatSourceID String,
UTMSource String,
UTMMedium String,
UTMCampaign String,
UTMContent String,
UTMTerm String,
FromTag String,
HasGCLID UInt8,
RefererHash UInt64,
URLHash UInt64,
CLID UInt32,
YCLID UInt64,
ShareService String,
ShareURL String,
ShareTitle String,
ParsedParams.Key1 Array(String),
ParsedParams.Key2 Array(String),
ParsedParams.Key3 Array(String),
ParsedParams.Key4 Array(String),
ParsedParams.Key5 Array(String),
ParsedParams.ValueDouble Array(Float64),
IslandID FixedString(16),
RequestNum UInt32,
RequestTry UInt8
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID)
SETTINGS disk = disk(type = cache, path = '/dev/shm/clickhouse/', max_size = '16G',
disk = disk(type = web, endpoint = 'https://clickhouse-datasets-web.s3.us-east-1.amazonaws.com/'));
"
service clickhouse-server restart
# Wait for server to start accepting connections
for _ in {1..120}; do
clickhouse-client --query "SELECT 1" && break
sleep 1
done
clickhouse-client --query "
ATTACH TABLE datasets.visits_v1 UUID '5131f834-711f-4168-98a5-968b691a104b'
(
CounterID UInt32,
StartDate Date,
Sign Int8,
IsNew UInt8,
VisitID UInt64,
UserID UInt64,
StartTime DateTime,
Duration UInt32,
UTCStartTime DateTime,
PageViews Int32,
Hits Int32,
IsBounce UInt8,
Referer String,
StartURL String,
RefererDomain String,
StartURLDomain String,
EndURL String,
LinkURL String,
IsDownload UInt8,
TraficSourceID Int8,
SearchEngineID UInt16,
SearchPhrase String,
AdvEngineID UInt8,
PlaceID Int32,
RefererCategories Array(UInt16),
URLCategories Array(UInt16),
URLRegions Array(UInt32),
RefererRegions Array(UInt32),
IsYandex UInt8,
GoalReachesDepth Int32,
GoalReachesURL Int32,
GoalReachesAny Int32,
SocialSourceNetworkID UInt8,
SocialSourcePage String,
MobilePhoneModel String,
ClientEventTime DateTime,
RegionID UInt32,
ClientIP UInt32,
ClientIP6 FixedString(16),
RemoteIP UInt32,
RemoteIP6 FixedString(16),
IPNetworkID UInt32,
SilverlightVersion3 UInt32,
CodeVersion UInt32,
ResolutionWidth UInt16,
ResolutionHeight UInt16,
UserAgentMajor UInt16,
UserAgentMinor UInt16,
WindowClientWidth UInt16,
WindowClientHeight UInt16,
SilverlightVersion2 UInt8,
SilverlightVersion4 UInt16,
FlashVersion3 UInt16,
FlashVersion4 UInt16,
ClientTimeZone Int16,
OS UInt8,
UserAgent UInt8,
ResolutionDepth UInt8,
FlashMajor UInt8,
FlashMinor UInt8,
NetMajor UInt8,
NetMinor UInt8,
MobilePhone UInt8,
SilverlightVersion1 UInt8,
Age UInt8,
Sex UInt8,
Income UInt8,
JavaEnable UInt8,
CookieEnable UInt8,
JavascriptEnable UInt8,
IsMobile UInt8,
BrowserLanguage UInt16,
BrowserCountry UInt16,
Interests UInt16,
Robotness UInt8,
GeneralInterests Array(UInt16),
Params Array(String),
Goals.ID Array(UInt32),
Goals.Serial Array(UInt32),
Goals.EventTime Array(DateTime),
Goals.Price Array(Int64),
Goals.OrderID Array(String),
Goals.CurrencyID Array(UInt32),
WatchIDs Array(UInt64),
ParamSumPrice Int64,
ParamCurrency FixedString(3),
ParamCurrencyID UInt16,
ClickLogID UInt64,
ClickEventID Int32,
ClickGoodEvent Int32,
ClickEventTime DateTime,
ClickPriorityID Int32,
ClickPhraseID Int32,
ClickPageID Int32,
ClickPlaceID Int32,
ClickTypeID Int32,
ClickResourceID Int32,
ClickCost UInt32,
ClickClientIP UInt32,
ClickDomainID UInt32,
ClickURL String,
ClickAttempt UInt8,
ClickOrderID UInt32,
ClickBannerID UInt32,
ClickMarketCategoryID UInt32,
ClickMarketPP UInt32,
ClickMarketCategoryName String,
ClickMarketPPName String,
ClickAWAPSCampaignName String,
ClickPageName String,
ClickTargetType UInt16,
ClickTargetPhraseID UInt64,
ClickContextType UInt8,
ClickSelectType Int8,
ClickOptions String,
ClickGroupBannerID Int32,
OpenstatServiceName String,
OpenstatCampaignID String,
OpenstatAdID String,
OpenstatSourceID String,
UTMSource String,
UTMMedium String,
UTMCampaign String,
UTMContent String,
UTMTerm String,
FromTag String,
HasGCLID UInt8,
FirstVisit DateTime,
PredLastVisit Date,
LastVisit Date,
TotalVisits UInt32,
TraficSource.ID Array(Int8),
TraficSource.SearchEngineID Array(UInt16),
TraficSource.AdvEngineID Array(UInt8),
TraficSource.PlaceID Array(UInt16),
TraficSource.SocialSourceNetworkID Array(UInt8),
TraficSource.Domain Array(String),
TraficSource.SearchPhrase Array(String),
TraficSource.SocialSourcePage Array(String),
Attendance FixedString(16),
CLID UInt32,
YCLID UInt64,
NormalizedRefererHash UInt64,
SearchPhraseHash UInt64,
RefererDomainHash UInt64,
NormalizedStartURLHash UInt64,
StartURLDomainHash UInt64,
NormalizedEndURLHash UInt64,
TopLevelDomain UInt64,
URLScheme UInt64,
OpenstatServiceNameHash UInt64,
OpenstatCampaignIDHash UInt64,
OpenstatAdIDHash UInt64,
OpenstatSourceIDHash UInt64,
UTMSourceHash UInt64,
UTMMediumHash UInt64,
UTMCampaignHash UInt64,
UTMContentHash UInt64,
UTMTermHash UInt64,
FromHash UInt64,
WebVisorEnabled UInt8,
WebVisorActivity UInt32,
ParsedParams.Key1 Array(String),
ParsedParams.Key2 Array(String),
ParsedParams.Key3 Array(String),
ParsedParams.Key4 Array(String),
ParsedParams.Key5 Array(String),
ParsedParams.ValueDouble Array(Float64),
Market.Type Array(UInt8),
Market.GoalID Array(UInt32),
Market.OrderID Array(String),
Market.OrderPrice Array(Int64),
Market.PP Array(UInt32),
Market.DirectPlaceID Array(UInt32),
Market.DirectOrderID Array(UInt32),
Market.DirectBannerID Array(UInt32),
Market.GoodID Array(String),
Market.GoodName Array(String),
Market.GoodQuantity Array(Int32),
Market.GoodPrice Array(Int64),
IslandID FixedString(16)
)
ENGINE = CollapsingMergeTree(Sign)
PARTITION BY toYYYYMM(StartDate)
ORDER BY (CounterID, StartDate, intHash32(UserID), VisitID)
SAMPLE BY intHash32(UserID)
SETTINGS disk = disk(type = cache, path = '/dev/shm/clickhouse/', max_size = '16G',
disk = disk(type = web, endpoint = 'https://clickhouse-datasets-web.s3.us-east-1.amazonaws.com/'));
"
clickhouse-client --query "SHOW TABLES FROM datasets"

View File

@ -1,126 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import time
import tarfile
import logging
import argparse
import requests
import tempfile
DEFAULT_URL = "https://clickhouse-datasets.s3.amazonaws.com"
AVAILABLE_DATASETS = {
"hits": "hits_v1.tar",
"visits": "visits_v1.tar",
}
RETRIES_COUNT = 5
def _get_temp_file_name():
return os.path.join(
tempfile._get_default_tempdir(), next(tempfile._get_candidate_names())
)
def build_url(base_url, dataset):
return os.path.join(base_url, dataset, "partitions", AVAILABLE_DATASETS[dataset])
def download_with_progress(url, path):
logging.info("Downloading from %s to temp path %s", url, path)
for i in range(RETRIES_COUNT):
try:
with open(path, "wb") as f:
response = requests.get(url, stream=True)
response.raise_for_status()
total_length = response.headers.get("content-length")
if total_length is None or int(total_length) == 0:
logging.info(
"No content-length, will download file without progress"
)
f.write(response.content)
else:
dl = 0
total_length = int(total_length)
logging.info("Content length is %ld bytes", total_length)
for data in response.iter_content(chunk_size=4096):
dl += len(data)
f.write(data)
if sys.stdout.isatty():
done = int(50 * dl / total_length)
percent = int(100 * float(dl) / total_length)
sys.stdout.write(
"\r[{}{}] {}%".format(
"=" * done, " " * (50 - done), percent
)
)
sys.stdout.flush()
break
except Exception as ex:
sys.stdout.write("\n")
time.sleep(3)
logging.info("Exception while downloading %s, retry %s", ex, i + 1)
if os.path.exists(path):
os.remove(path)
else:
raise Exception(
"Cannot download dataset from {}, all retries exceeded".format(url)
)
sys.stdout.write("\n")
logging.info("Downloading finished")
def unpack_to_clickhouse_directory(tar_path, clickhouse_path):
logging.info(
"Will unpack data from temp path %s to clickhouse db %s",
tar_path,
clickhouse_path,
)
with tarfile.open(tar_path, "r") as comp_file:
comp_file.extractall(path=clickhouse_path)
logging.info("Unpack finished")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(
description="Simple tool for dowloading datasets for clickhouse from S3"
)
parser.add_argument(
"--dataset-names",
required=True,
nargs="+",
choices=list(AVAILABLE_DATASETS.keys()),
)
parser.add_argument("--url-prefix", default=DEFAULT_URL)
parser.add_argument("--clickhouse-data-path", default="/var/lib/clickhouse/")
args = parser.parse_args()
datasets = args.dataset_names
logging.info("Will fetch following datasets: %s", ", ".join(datasets))
for dataset in datasets:
logging.info("Processing %s", dataset)
temp_archive_path = _get_temp_file_name()
try:
download_url_for_dataset = build_url(args.url_prefix, dataset)
download_with_progress(download_url_for_dataset, temp_archive_path)
unpack_to_clickhouse_directory(temp_archive_path, args.clickhouse_data_path)
except Exception as ex:
logging.info("Some exception occured %s", str(ex))
raise
finally:
logging.info(
"Will remove downloaded file %s from filesystem if it exists",
temp_archive_path,
)
if os.path.exists(temp_archive_path):
os.remove(temp_archive_path)
logging.info("Processing of %s finished", dataset)
logging.info("Fetch finished, enjoy your tables!")

View File

@ -1,222 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import logging
import argparse
import tarfile
import math
try:
from boto.s3.connection import S3Connection
from boto.s3.key import Key
except ImportError:
raise ImportError("You have to install boto package 'pip install boto'")
class S3API(object):
def __init__(self, access_key, secret_access_key, mds_api, mds_url):
self.connection = S3Connection(
host=mds_api,
aws_access_key_id=access_key,
aws_secret_access_key=secret_access_key,
)
self.mds_url = mds_url
def upload_file(self, bucket_name, file_path, s3_path):
logging.info("Start uploading file to bucket %s", bucket_name)
bucket = self.connection.get_bucket(bucket_name)
key = bucket.initiate_multipart_upload(s3_path)
logging.info("Will upload to s3 path %s", s3_path)
chunksize = 1024 * 1024 * 1024 # 1 GB
filesize = os.stat(file_path).st_size
logging.info("File size is %s", filesize)
chunkcount = int(math.ceil(filesize / chunksize))
def call_back(x, y):
print("Uploaded {}/{} bytes".format(x, y))
try:
for i in range(chunkcount + 1):
logging.info("Uploading chunk %s of %s", i, chunkcount + 1)
offset = chunksize * i
bytes_size = min(chunksize, filesize - offset)
with open(file_path, "r") as fp:
fp.seek(offset)
key.upload_part_from_file(
fp=fp, part_num=i + 1, size=bytes_size, cb=call_back, num_cb=100
)
key.complete_upload()
except Exception as ex:
key.cancel_upload()
raise ex
logging.info("Contents were set")
return "https://{bucket}.{mds_url}/{path}".format(
bucket=bucket_name, mds_url=self.mds_url, path=s3_path
)
def set_file_contents(self, bucket, local_file_path, s3_file_path):
key = Key(bucket)
key.key = s3_file_path
file_size = os.stat(local_file_path).st_size
logging.info(
"Uploading file `%s` to `%s`. Size is %s",
local_file_path,
s3_file_path,
file_size,
)
def call_back(x, y):
print("Uploaded {}/{} bytes".format(x, y))
key.set_contents_from_filename(local_file_path, cb=call_back)
def upload_data_for_static_files_disk(self, bucket_name, directory_path, s3_path):
bucket = self.connection.get_bucket(bucket_name)
if s3_path.endswith("/"):
s3_path += "store/"
else:
s3_path += "/store/"
print(s3_path)
for root, dirs, files in os.walk(directory_path):
path = root.split(os.sep)
for file in files:
local_file_path = os.path.join(root, file)
s3_file = local_file_path[len(directory_path) + 1 :]
s3_file_path = os.path.join(s3_path, s3_file)
self.set_file_contents(bucket, local_file_path, s3_file_path)
logging.info("Uploading finished")
return "https://{bucket}.{mds_url}/{path}".format(
bucket=bucket_name, mds_url=self.mds_url, path=s3_path
)
def list_bucket_keys(self, bucket_name):
bucket = self.connection.get_bucket(bucket_name)
for obj in bucket.get_all_keys():
print(obj.key)
def remove_folder_from_bucket(self, bucket_name, folder_path):
bucket = self.connection.get_bucket(bucket_name)
bucket.get_all_keys()
for obj in bucket.get_all_keys():
if obj.key.startswith(folder_path):
print("Removing " + obj.key)
obj.delete()
def make_tar_file_for_table(clickhouse_data_path, db_name, table_name, tmp_prefix):
relative_data_path = os.path.join("data", db_name, table_name)
relative_meta_path = os.path.join("metadata", db_name, table_name + ".sql")
path_to_data = os.path.join(clickhouse_data_path, relative_data_path)
path_to_metadata = os.path.join(clickhouse_data_path, relative_meta_path)
temporary_file_name = tmp_prefix + "/{tname}.tar".format(tname=table_name)
with tarfile.open(temporary_file_name, "w") as bundle:
bundle.add(path_to_data, arcname=relative_data_path)
bundle.add(path_to_metadata, arcname=relative_meta_path)
return temporary_file_name
USAGE_EXAMPLES = """
examples:
\t./s3uploader --dataset-name some_ds --access-key-id XXX --secret-access-key YYY --clickhouse-data-path /opt/clickhouse/ --table-name default.some_tbl --bucket-name some-bucket
\t./s3uploader --dataset-name some_ds --access-key-id XXX --secret-access-key YYY --file-path some_ds.tsv.xz --bucket-name some-bucket --s3-path /path/to/
"""
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
parser = argparse.ArgumentParser(
description="Simple tool for uploading datasets to clickhouse S3",
usage="%(prog)s [options] {}".format(USAGE_EXAMPLES),
)
parser.add_argument("--s3-api-url", default="s3.amazonaws.com")
parser.add_argument("--s3-common-url", default="s3.amazonaws.com")
parser.add_argument("--bucket-name", default="clickhouse-datasets")
parser.add_argument(
"--dataset-name",
required=True,
help="Name of dataset, will be used in uploaded path",
)
parser.add_argument("--access-key-id", required=True)
parser.add_argument("--secret-access-key", required=True)
parser.add_argument(
"--clickhouse-data-path",
default="/var/lib/clickhouse/",
help="Path to clickhouse database on filesystem",
)
parser.add_argument("--s3-path", help="Path in s3, where to upload file")
parser.add_argument(
"--tmp-prefix", default="/tmp", help="Prefix to store temporary downloaded file"
)
data_group = parser.add_mutually_exclusive_group(required=True)
table_name_argument = data_group.add_argument(
"--table-name",
help="Name of table with database, if you are uploading partitions",
)
data_group.add_argument("--file-path", help="Name of file, if you are uploading")
data_group.add_argument(
"--directory-path", help="Path to directory with files to upload"
)
data_group.add_argument(
"--list-directory", help="List s3 directory by --directory-path"
)
data_group.add_argument(
"--remove-directory", help="Remove s3 directory by --directory-path"
)
args = parser.parse_args()
if args.table_name is not None and args.clickhouse_data_path is None:
raise argparse.ArgumentError(
table_name_argument,
"You should specify --clickhouse-data-path to upload --table",
)
s3_conn = S3API(
args.access_key_id, args.secret_access_key, args.s3_api_url, args.s3_common_url
)
file_path = ""
directory_path = args.directory_path
s3_path = args.s3_path
if args.list_directory:
s3_conn.list_bucket_keys(args.bucket_name)
elif args.remove_directory:
print("Removing s3 path: " + args.remove_directory)
s3_conn.remove_folder_from_bucket(args.bucket_name, args.remove_directory)
elif args.directory_path is not None:
url = s3_conn.upload_data_for_static_files_disk(
args.bucket_name, directory_path, s3_path
)
logging.info("Data uploaded: %s", url)
else:
if args.table_name is not None:
if "." not in args.table_name:
db_name = "default"
else:
db_name, table_name = args.table_name.split(".")
file_path = make_tar_file_for_table(
args.clickhouse_data_path, db_name, table_name, args.tmp_prefix
)
else:
file_path = args.file_path
if "tsv" in file_path:
s3_path = os.path.join(
args.dataset_name, "tsv", os.path.basename(file_path)
)
if args.table_name is not None:
s3_path = os.path.join(
args.dataset_name, "partitions", os.path.basename(file_path)
)
elif args.s3_path is not None:
s3_path = os.path.join(
args.dataset_name, args.s3_path, os.path.basename(file_path)
)
else:
raise Exception("Don't know s3-path to upload")
url = s3_conn.upload_file(args.bucket_name, file_path, s3_path)
logging.info("Data uploaded: %s", url)