mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 21:42:39 +00:00
880 lines
30 KiB
Python
880 lines
30 KiB
Python
import io
|
|
import logging
|
|
import os
|
|
import random
|
|
import time
|
|
|
|
import pytest
|
|
from helpers.client import QueryRuntimeException
|
|
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
|
|
import json
|
|
|
|
"""
|
|
export CLICKHOUSE_TESTS_SERVER_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-server
|
|
export CLICKHOUSE_TESTS_CLIENT_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-client
|
|
export CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-odbc-bridge
|
|
export CLICKHOUSE_TESTS_BASE_CONFIG_DIR=/home/sergey/vkr/ClickHouse/programs/server
|
|
|
|
"""
|
|
|
|
|
|
def prepare_s3_bucket(started_cluster):
|
|
# Allows read-write access for bucket without authorization.
|
|
bucket_read_write_policy = {
|
|
"Version": "2012-10-17",
|
|
"Statement": [
|
|
{
|
|
"Sid": "",
|
|
"Effect": "Allow",
|
|
"Principal": {"AWS": "*"},
|
|
"Action": "s3:GetBucketLocation",
|
|
"Resource": "arn:aws:s3:::root",
|
|
},
|
|
{
|
|
"Sid": "",
|
|
"Effect": "Allow",
|
|
"Principal": {"AWS": "*"},
|
|
"Action": "s3:ListBucket",
|
|
"Resource": "arn:aws:s3:::root",
|
|
},
|
|
{
|
|
"Sid": "",
|
|
"Effect": "Allow",
|
|
"Principal": {"AWS": "*"},
|
|
"Action": "s3:GetObject",
|
|
"Resource": "arn:aws:s3:::root/*",
|
|
},
|
|
{
|
|
"Sid": "",
|
|
"Effect": "Allow",
|
|
"Principal": {"AWS": "*"},
|
|
"Action": "s3:PutObject",
|
|
"Resource": "arn:aws:s3:::root/*",
|
|
},
|
|
{
|
|
"Sid": "",
|
|
"Effect": "Allow",
|
|
"Principal": {"AWS": "*"},
|
|
"Action": "s3:DeleteObject",
|
|
"Resource": "arn:aws:s3:::root/*",
|
|
},
|
|
],
|
|
}
|
|
|
|
minio_client = started_cluster.minio_client
|
|
minio_client.set_bucket_policy(
|
|
started_cluster.minio_bucket, json.dumps(bucket_read_write_policy)
|
|
)
|
|
|
|
started_cluster.minio_restricted_bucket = "{}-with-auth".format(
|
|
started_cluster.minio_bucket
|
|
)
|
|
if minio_client.bucket_exists(started_cluster.minio_restricted_bucket):
|
|
minio_client.remove_bucket(started_cluster.minio_restricted_bucket)
|
|
|
|
minio_client.make_bucket(started_cluster.minio_restricted_bucket)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def s3_queue_setup_teardown(started_cluster):
|
|
instance = started_cluster.instances["instance"]
|
|
instance_2 = started_cluster.instances["instance2"]
|
|
|
|
instance.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
|
|
instance_2.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
|
|
|
|
minio = started_cluster.minio_client
|
|
objects = list(
|
|
minio.list_objects(started_cluster.minio_restricted_bucket, recursive=True)
|
|
)
|
|
for obj in objects:
|
|
minio.remove_object(started_cluster.minio_restricted_bucket, obj.object_name)
|
|
yield # run test
|
|
|
|
|
|
MINIO_INTERNAL_PORT = 9001
|
|
AVAILABLE_MODES = ["unordered", "ordered"]
|
|
AUTH = "'minio','minio123',"
|
|
|
|
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
|
|
def put_s3_file_content(started_cluster, bucket, filename, data):
|
|
buf = io.BytesIO(data)
|
|
started_cluster.minio_client.put_object(bucket, filename, buf, len(data))
|
|
|
|
|
|
def generate_random_files(
|
|
count, prefix, cluster, bucket, column_num=3, row_num=10, start_ind=0
|
|
):
|
|
total_values = []
|
|
to_generate = [
|
|
(f"{prefix}/test_{i}.csv", i) for i in range(start_ind, start_ind + count)
|
|
]
|
|
to_generate.sort(key=lambda x: x[0])
|
|
|
|
for filename, i in to_generate:
|
|
rand_values = [
|
|
[random.randint(0, 50) for _ in range(column_num)] for _ in range(row_num)
|
|
]
|
|
total_values += rand_values
|
|
values_csv = (
|
|
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
|
|
).encode()
|
|
put_s3_file_content(cluster, bucket, filename, values_csv)
|
|
return total_values
|
|
|
|
|
|
# Returns content of given S3 file as string.
|
|
def get_s3_file_content(started_cluster, bucket, filename, decode=True):
|
|
# type: (ClickHouseCluster, str, str, bool) -> str
|
|
|
|
data = started_cluster.minio_client.get_object(bucket, filename)
|
|
data_str = b""
|
|
for chunk in data.stream():
|
|
data_str += chunk
|
|
if decode:
|
|
return data_str.decode()
|
|
return data_str
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster = ClickHouseCluster(__file__)
|
|
cluster.add_instance(
|
|
"instance",
|
|
user_configs=["configs/users.xml"],
|
|
with_minio=True,
|
|
with_zookeeper=True,
|
|
main_configs=["configs/defaultS3.xml", "configs/named_collections.xml"],
|
|
)
|
|
cluster.add_instance(
|
|
"instance2",
|
|
user_configs=["configs/users.xml"],
|
|
with_minio=True,
|
|
with_zookeeper=True,
|
|
main_configs=["configs/defaultS3.xml", "configs/named_collections.xml"],
|
|
)
|
|
|
|
logging.info("Starting cluster...")
|
|
cluster.start()
|
|
logging.info("Cluster started")
|
|
|
|
prepare_s3_bucket(cluster)
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def run_query(instance, query, stdin=None, settings=None):
|
|
# type: (ClickHouseInstance, str, object, dict) -> str
|
|
|
|
logging.info("Running query '{}'...".format(query))
|
|
result = instance.query(query, stdin=stdin, settings=settings)
|
|
logging.info("Query finished")
|
|
|
|
return result
|
|
|
|
|
|
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
|
|
def test_delete_after_processing(started_cluster, mode):
|
|
prefix = "delete"
|
|
bucket = started_cluster.minio_bucket
|
|
instance = started_cluster.instances["instance"]
|
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
|
|
total_values = generate_random_files(5, prefix, started_cluster, bucket)
|
|
instance.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS test.s3_queue;
|
|
CREATE TABLE test.s3_queue ({table_format})
|
|
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
|
|
SETTINGS
|
|
mode = '{mode}',
|
|
keeper_path = '/clickhouse/test_delete_{mode}',
|
|
s3queue_loading_retries = 3,
|
|
after_processing='delete';
|
|
"""
|
|
)
|
|
|
|
get_query = f"SELECT * FROM test.s3_queue ORDER BY column1, column2, column3"
|
|
assert [
|
|
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
] == sorted(total_values, key=lambda x: (x[0], x[1], x[2]))
|
|
minio = started_cluster.minio_client
|
|
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
|
|
assert len(objects) == 0
|
|
|
|
|
|
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
|
|
def test_failed_retry(started_cluster, mode):
|
|
bucket = started_cluster.minio_restricted_bucket
|
|
instance = started_cluster.instances["instance"]
|
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
|
|
values = [
|
|
["failed", 1, 1],
|
|
]
|
|
values_csv = (
|
|
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
|
|
).encode()
|
|
filename = f"test.csv"
|
|
put_s3_file_content(started_cluster, bucket, filename, values_csv)
|
|
|
|
instance.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS test.s3_queue;
|
|
CREATE TABLE test.s3_queue ({table_format})
|
|
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {AUTH}'CSV')
|
|
SETTINGS
|
|
mode = '{mode}',
|
|
keeper_path = '/clickhouse/select_failed_retry_{mode}',
|
|
s3queue_loading_retries = 3;
|
|
"""
|
|
)
|
|
|
|
# first try
|
|
get_query = f"SELECT * FROM test.s3_queue"
|
|
assert [
|
|
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
] == []
|
|
# second try
|
|
assert [
|
|
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
] == []
|
|
# upload correct file
|
|
values = [
|
|
[1, 1, 1],
|
|
]
|
|
values_csv = (
|
|
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
|
|
).encode()
|
|
put_s3_file_content(started_cluster, bucket, filename, values_csv)
|
|
|
|
assert [
|
|
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
] == values
|
|
|
|
assert [
|
|
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
] == []
|
|
|
|
|
|
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
|
|
def test_direct_select_file(started_cluster, mode):
|
|
auth = "'minio','minio123',"
|
|
bucket = started_cluster.minio_restricted_bucket
|
|
instance = started_cluster.instances["instance"]
|
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
values = [
|
|
[12549, 2463, 19893],
|
|
[64021, 38652, 66703],
|
|
[81611, 39650, 83516],
|
|
]
|
|
values_csv = (
|
|
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
|
|
).encode()
|
|
filename = f"test.csv"
|
|
put_s3_file_content(started_cluster, bucket, filename, values_csv)
|
|
instance.query(
|
|
"""
|
|
DROP TABLE IF EXISTS test.s3_queue;
|
|
DROP TABLE IF EXISTS test.s3_queue_2;
|
|
DROP TABLE IF EXISTS test.s3_queue_3;
|
|
"""
|
|
)
|
|
|
|
instance.query(
|
|
f"""
|
|
CREATE TABLE test.s3_queue ({table_format})
|
|
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {auth}'CSV')
|
|
SETTINGS
|
|
mode = '{mode}',
|
|
keeper_path = '/clickhouse/select_{mode}'
|
|
"""
|
|
)
|
|
|
|
get_query = f"SELECT * FROM test.s3_queue"
|
|
assert [
|
|
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
] == values
|
|
|
|
instance.query(
|
|
f"""
|
|
CREATE TABLE test.s3_queue_2 ({table_format})
|
|
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {auth}'CSV')
|
|
SETTINGS
|
|
mode = '{mode}',
|
|
keeper_path = '/clickhouse/select_{mode}'
|
|
"""
|
|
)
|
|
|
|
get_query = f"SELECT * FROM test.s3_queue"
|
|
assert [
|
|
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
] == []
|
|
# New table with same zookeeper path
|
|
get_query = f"SELECT * FROM test.s3_queue_2"
|
|
assert [
|
|
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
] == []
|
|
# New table with different zookeeper path
|
|
instance.query(
|
|
f"""
|
|
CREATE TABLE test.s3_queue_3 ({table_format})
|
|
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {auth}'CSV')
|
|
SETTINGS
|
|
mode = '{mode}',
|
|
keeper_path='/clickhouse/select_{mode}_2'
|
|
"""
|
|
)
|
|
get_query = f"SELECT * FROM test.s3_queue_3"
|
|
assert [
|
|
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
] == values
|
|
|
|
values = [
|
|
[1, 1, 1],
|
|
]
|
|
values_csv = (
|
|
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
|
|
).encode()
|
|
filename = f"t.csv"
|
|
put_s3_file_content(started_cluster, bucket, filename, values_csv)
|
|
|
|
get_query = f"SELECT * FROM test.s3_queue_3"
|
|
if mode == "unordered":
|
|
assert [
|
|
list(map(int, l.split()))
|
|
for l in run_query(instance, get_query).splitlines()
|
|
] == values
|
|
elif mode == "ordered":
|
|
assert [
|
|
list(map(int, l.split()))
|
|
for l in run_query(instance, get_query).splitlines()
|
|
] == []
|
|
|
|
|
|
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
|
|
def test_direct_select_multiple_files(started_cluster, mode):
|
|
prefix = f"multiple_files_{mode}"
|
|
bucket = started_cluster.minio_restricted_bucket
|
|
instance = started_cluster.instances["instance"]
|
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
instance.query("drop table if exists test.s3_queue")
|
|
instance.query(
|
|
f"""
|
|
CREATE TABLE test.s3_queue ({table_format})
|
|
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
|
|
SETTINGS
|
|
mode = '{mode}',
|
|
keeper_path = '/clickhouse/select_multiple_{mode}'
|
|
"""
|
|
)
|
|
|
|
for i in range(5):
|
|
rand_values = [[random.randint(0, 50) for _ in range(3)] for _ in range(10)]
|
|
|
|
values_csv = (
|
|
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
|
|
).encode()
|
|
filename = f"{prefix}/test_{i}.csv"
|
|
put_s3_file_content(started_cluster, bucket, filename, values_csv)
|
|
|
|
get_query = f"SELECT * FROM test.s3_queue"
|
|
assert [
|
|
list(map(int, l.split()))
|
|
for l in run_query(instance, get_query).splitlines()
|
|
] == rand_values
|
|
|
|
total_values = generate_random_files(
|
|
4, prefix, started_cluster, bucket, start_ind=5
|
|
)
|
|
get_query = f"SELECT * FROM test.s3_queue"
|
|
assert {
|
|
tuple(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
} == set([tuple(i) for i in total_values])
|
|
|
|
|
|
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
|
|
def test_streaming_to_view_(started_cluster, mode):
|
|
prefix = f"streaming_files_{mode}"
|
|
bucket = started_cluster.minio_restricted_bucket
|
|
instance = started_cluster.instances["instance"]
|
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
|
|
total_values = generate_random_files(10, prefix, started_cluster, bucket)
|
|
instance.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS test.s3_queue_persistent;
|
|
DROP TABLE IF EXISTS test.s3_queue;
|
|
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
|
|
|
|
CREATE TABLE test.s3_queue_persistent ({table_format})
|
|
ENGINE = MergeTree()
|
|
ORDER BY column1;
|
|
|
|
CREATE TABLE test.s3_queue ({table_format})
|
|
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
|
|
SETTINGS
|
|
mode = '{mode}',
|
|
keeper_path = '/clickhouse/view_{mode}';
|
|
|
|
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
|
|
SELECT
|
|
*
|
|
FROM test.s3_queue;
|
|
"""
|
|
)
|
|
expected_values = set([tuple(i) for i in total_values])
|
|
for i in range(10):
|
|
get_query = f"SELECT * FROM test.persistent_s3_queue_mv"
|
|
|
|
selected_values = {
|
|
tuple(map(int, l.split()))
|
|
for l in run_query(instance, get_query).splitlines()
|
|
}
|
|
if selected_values != expected_values:
|
|
time.sleep(1)
|
|
else:
|
|
break
|
|
|
|
assert selected_values == expected_values
|
|
|
|
|
|
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
|
|
def test_streaming_to_many_views(started_cluster, mode):
|
|
prefix = f"streaming_files_{mode}"
|
|
bucket = started_cluster.minio_restricted_bucket
|
|
instance = started_cluster.instances["instance"]
|
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
retry_cnt = 10
|
|
|
|
instance.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS test.s3_queue_persistent;
|
|
DROP TABLE IF EXISTS test.s3_queue_persistent_2;
|
|
DROP TABLE IF EXISTS test.s3_queue_persistent_3;
|
|
DROP TABLE IF EXISTS test.s3_queue;
|
|
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
|
|
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_2;
|
|
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_3;
|
|
|
|
|
|
CREATE TABLE test.s3_queue_persistent ({table_format})
|
|
ENGINE = MergeTree()
|
|
ORDER BY column1;
|
|
|
|
CREATE TABLE test.s3_queue_persistent_2 ({table_format})
|
|
ENGINE = MergeTree()
|
|
ORDER BY column1;
|
|
|
|
CREATE TABLE test.s3_queue_persistent_3 ({table_format})
|
|
ENGINE = MergeTree()
|
|
ORDER BY column1;
|
|
|
|
CREATE TABLE test.s3_queue ({table_format})
|
|
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
|
|
SETTINGS
|
|
mode = '{mode}',
|
|
keeper_path = '/clickhouse/multiple_view_{mode}';
|
|
|
|
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
|
|
SELECT
|
|
*
|
|
FROM test.s3_queue;
|
|
|
|
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_2 TO test.s3_queue_persistent_2 AS
|
|
SELECT
|
|
*
|
|
FROM test.s3_queue;
|
|
|
|
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_3 TO test.s3_queue_persistent_3 AS
|
|
SELECT
|
|
*
|
|
FROM test.s3_queue;
|
|
"""
|
|
)
|
|
total_values = generate_random_files(5, prefix, started_cluster, bucket)
|
|
expected_values = set([tuple(i) for i in total_values])
|
|
|
|
for i in range(retry_cnt):
|
|
retry = False
|
|
for get_query in [
|
|
f"SELECT * FROM test.s3_queue_persistent",
|
|
f"SELECT * FROM test.s3_queue_persistent_2",
|
|
f"SELECT * FROM test.s3_queue_persistent_3",
|
|
]:
|
|
selected_values = {
|
|
tuple(map(int, l.split()))
|
|
for l in run_query(instance, get_query).splitlines()
|
|
}
|
|
if i == retry_cnt - 1:
|
|
assert selected_values == expected_values
|
|
if selected_values != expected_values:
|
|
retry = True
|
|
break
|
|
if retry:
|
|
time.sleep(1)
|
|
else:
|
|
break
|
|
|
|
|
|
def test_multiple_tables_meta_mismatch(started_cluster):
|
|
prefix = f"test_meta"
|
|
bucket = started_cluster.minio_restricted_bucket
|
|
instance = started_cluster.instances["instance"]
|
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
|
|
instance.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS test.s3_queue;
|
|
|
|
CREATE TABLE test.s3_queue ({table_format})
|
|
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
|
|
SETTINGS
|
|
mode = 'ordered',
|
|
keeper_path = '/clickhouse/test_meta';
|
|
"""
|
|
)
|
|
# check mode
|
|
failed = False
|
|
try:
|
|
instance.query(
|
|
f"""
|
|
CREATE TABLE test.s3_queue_copy ({table_format})
|
|
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
|
|
SETTINGS
|
|
mode = 'unordered',
|
|
keeper_path = '/clickhouse/test_meta';
|
|
"""
|
|
)
|
|
except QueryRuntimeException as e:
|
|
assert "Existing table metadata in ZooKeeper differs in engine mode" in str(e)
|
|
failed = True
|
|
assert failed is True
|
|
|
|
# check columns
|
|
table_format_copy = table_format + ", column4 UInt32"
|
|
try:
|
|
instance.query(
|
|
f"""
|
|
CREATE TABLE test.s3_queue_copy ({table_format_copy})
|
|
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
|
|
SETTINGS
|
|
mode = 'ordered',
|
|
keeper_path = '/clickhouse/test_meta';
|
|
"""
|
|
)
|
|
except QueryRuntimeException as e:
|
|
assert (
|
|
"Table columns structure in ZooKeeper is different from local table structure"
|
|
in str(e)
|
|
)
|
|
failed = True
|
|
|
|
assert failed is True
|
|
|
|
# check format
|
|
try:
|
|
instance.query(
|
|
f"""
|
|
CREATE TABLE test.s3_queue_copy ({table_format})
|
|
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'TSV')
|
|
SETTINGS
|
|
mode = 'ordered',
|
|
keeper_path = '/clickhouse/test_meta';
|
|
"""
|
|
)
|
|
except QueryRuntimeException as e:
|
|
assert "Existing table metadata in ZooKeeper differs in format name" in str(e)
|
|
failed = True
|
|
assert failed is True
|
|
|
|
# create working engine
|
|
instance.query(
|
|
f"""
|
|
CREATE TABLE test.s3_queue_copy ({table_format})
|
|
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
|
|
SETTINGS
|
|
mode = 'ordered',
|
|
keeper_path = '/clickhouse/test_meta';
|
|
"""
|
|
)
|
|
|
|
|
|
def test_max_set_age(started_cluster):
|
|
files_to_generate = 10
|
|
max_age = 1
|
|
prefix = f"test_multiple"
|
|
bucket = started_cluster.minio_restricted_bucket
|
|
instance = started_cluster.instances["instance"]
|
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
|
|
instance.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS test.s3_queue;
|
|
|
|
CREATE TABLE test.s3_queue ({table_format})
|
|
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
|
|
SETTINGS
|
|
mode = 'unordered',
|
|
keeper_path = '/clickhouse/test_set_age',
|
|
s3queue_tracked_files_limit = 10,
|
|
s3queue_tracked_file_ttl_sec = {max_age};
|
|
"""
|
|
)
|
|
|
|
total_values = generate_random_files(
|
|
files_to_generate, prefix, started_cluster, bucket, row_num=1
|
|
)
|
|
get_query = f"SELECT * FROM test.s3_queue"
|
|
res1 = [
|
|
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
]
|
|
assert res1 == total_values
|
|
time.sleep(max_age + 1)
|
|
|
|
get_query = f"SELECT * FROM test.s3_queue"
|
|
res1 = [
|
|
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
]
|
|
assert res1 == total_values
|
|
|
|
|
|
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
|
|
def test_multiple_tables_streaming_sync(started_cluster, mode):
|
|
files_to_generate = 300
|
|
poll_size = 30
|
|
prefix = f"test_multiple_{mode}"
|
|
bucket = started_cluster.minio_restricted_bucket
|
|
instance = started_cluster.instances["instance"]
|
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
|
|
instance.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS test.s3_queue;
|
|
DROP TABLE IF EXISTS test.s3_queue_copy;
|
|
DROP TABLE IF EXISTS test.s3_queue_copy_2;
|
|
|
|
DROP TABLE IF EXISTS test.s3_queue_persistent;
|
|
DROP TABLE IF EXISTS test.s3_queue_persistent_copy;
|
|
DROP TABLE IF EXISTS test.s3_queue_persistent_copy_2;
|
|
|
|
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
|
|
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_copy;
|
|
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_copy_2;
|
|
|
|
CREATE TABLE test.s3_queue ({table_format})
|
|
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
|
|
SETTINGS
|
|
mode = '{mode}',
|
|
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
|
|
s3queue_polling_size = {poll_size};
|
|
|
|
CREATE TABLE test.s3_queue_copy ({table_format})
|
|
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
|
|
SETTINGS
|
|
mode = '{mode}',
|
|
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
|
|
s3queue_polling_size = {poll_size};
|
|
|
|
CREATE TABLE test.s3_queue_copy_2 ({table_format})
|
|
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
|
|
SETTINGS
|
|
mode = '{mode}',
|
|
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
|
|
s3queue_polling_size = {poll_size};
|
|
|
|
CREATE TABLE test.s3_queue_persistent ({table_format})
|
|
ENGINE = MergeTree()
|
|
ORDER BY column1;
|
|
|
|
CREATE TABLE test.s3_queue_persistent_copy ({table_format})
|
|
ENGINE = MergeTree()
|
|
ORDER BY column1;
|
|
|
|
CREATE TABLE test.s3_queue_persistent_copy_2 ({table_format})
|
|
ENGINE = MergeTree()
|
|
ORDER BY column1;
|
|
|
|
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
|
|
SELECT
|
|
*
|
|
FROM test.s3_queue;
|
|
|
|
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_copy TO test.s3_queue_persistent_copy AS
|
|
SELECT
|
|
*
|
|
FROM test.s3_queue_copy;
|
|
|
|
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_copy_2 TO test.s3_queue_persistent_copy_2 AS
|
|
SELECT
|
|
*
|
|
FROM test.s3_queue_copy_2;
|
|
"""
|
|
)
|
|
total_values = generate_random_files(
|
|
files_to_generate, prefix, started_cluster, bucket, row_num=1
|
|
)
|
|
|
|
def get_count(table_name):
|
|
return int(run_query(instance, f"SELECT count() FROM {table_name}"))
|
|
|
|
for _ in range(100):
|
|
if (
|
|
get_count("test.s3_queue_persistent")
|
|
+ get_count("test.s3_queue_persistent_copy")
|
|
+ get_count("test.s3_queue_persistent_copy_2")
|
|
) == files_to_generate:
|
|
break
|
|
time.sleep(1)
|
|
|
|
get_query = f"SELECT * FROM test.s3_queue_persistent"
|
|
res1 = [
|
|
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
]
|
|
get_query_copy = f"SELECT * FROM test.s3_queue_persistent_copy"
|
|
res2 = [
|
|
list(map(int, l.split()))
|
|
for l in run_query(instance, get_query_copy).splitlines()
|
|
]
|
|
get_query_copy_2 = f"SELECT * FROM test.s3_queue_persistent_copy_2"
|
|
res3 = [
|
|
list(map(int, l.split()))
|
|
for l in run_query(instance, get_query_copy_2).splitlines()
|
|
]
|
|
assert {tuple(v) for v in res1 + res2 + res3} == set(
|
|
[tuple(i) for i in total_values]
|
|
)
|
|
|
|
# Checking that all files were processed only once
|
|
time.sleep(10)
|
|
assert (
|
|
get_count("test.s3_queue_persistent")
|
|
+ get_count("test.s3_queue_persistent_copy")
|
|
+ get_count("test.s3_queue_persistent_copy_2")
|
|
) == files_to_generate
|
|
|
|
|
|
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
|
|
def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
|
|
files_to_generate = 100
|
|
poll_size = 2
|
|
prefix = f"test_multiple_{mode}"
|
|
bucket = started_cluster.minio_restricted_bucket
|
|
instance = started_cluster.instances["instance"]
|
|
instance_2 = started_cluster.instances["instance2"]
|
|
|
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
for inst in [instance, instance_2]:
|
|
inst.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS test.s3_queue;
|
|
DROP TABLE IF EXISTS test.s3_queue_persistent;
|
|
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
|
|
|
|
CREATE TABLE test.s3_queue ({table_format})
|
|
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
|
|
SETTINGS
|
|
mode = '{mode}',
|
|
keeper_path = '/clickhouse/test_multiple_consumers_{mode}',
|
|
s3queue_polling_size = {poll_size};
|
|
|
|
CREATE TABLE test.s3_queue_persistent ({table_format})
|
|
ENGINE = MergeTree()
|
|
ORDER BY column1;
|
|
"""
|
|
)
|
|
|
|
for inst in [instance, instance_2]:
|
|
inst.query(
|
|
f"""
|
|
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
|
|
SELECT
|
|
*
|
|
FROM test.s3_queue;
|
|
"""
|
|
)
|
|
|
|
total_values = generate_random_files(
|
|
files_to_generate, prefix, started_cluster, bucket, row_num=1
|
|
)
|
|
|
|
def get_count(node, table_name):
|
|
return int(run_query(node, f"SELECT count() FROM {table_name}"))
|
|
|
|
for _ in range(150):
|
|
if (
|
|
get_count(instance, "test.s3_queue_persistent")
|
|
+ get_count(instance_2, "test.s3_queue_persistent")
|
|
) == files_to_generate:
|
|
break
|
|
time.sleep(1)
|
|
|
|
get_query = f"SELECT * FROM test.s3_queue_persistent"
|
|
res1 = [
|
|
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
]
|
|
res2 = [
|
|
list(map(int, l.split())) for l in run_query(instance_2, get_query).splitlines()
|
|
]
|
|
|
|
assert len(res1) + len(res2) == files_to_generate
|
|
|
|
# Checking that all engines have made progress
|
|
assert len(res1) > 0
|
|
assert len(res2) > 0
|
|
|
|
assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values])
|
|
|
|
# Checking that all files were processed only once
|
|
time.sleep(10)
|
|
assert (
|
|
get_count(instance, "test.s3_queue_persistent")
|
|
+ get_count(instance_2, "test.s3_queue_persistent")
|
|
) == files_to_generate
|
|
|
|
|
|
def test_max_set_size(started_cluster):
|
|
files_to_generate = 10
|
|
prefix = f"test_multiple"
|
|
bucket = started_cluster.minio_restricted_bucket
|
|
instance = started_cluster.instances["instance"]
|
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
|
|
instance.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS test.s3_queue;
|
|
|
|
CREATE TABLE test.s3_queue ({table_format})
|
|
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
|
|
SETTINGS
|
|
mode = 'unordered',
|
|
keeper_path = '/clickhouse/test_set_size',
|
|
s3queue_tracked_files_limit = {files_to_generate - 1};
|
|
"""
|
|
)
|
|
|
|
total_values = generate_random_files(
|
|
files_to_generate, prefix, started_cluster, bucket, start_ind=0, row_num=1
|
|
)
|
|
get_query = f"SELECT * FROM test.s3_queue"
|
|
res1 = [
|
|
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
]
|
|
assert res1 == total_values
|
|
|
|
get_query = f"SELECT * FROM test.s3_queue"
|
|
res1 = [
|
|
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
]
|
|
assert res1 == [total_values[0]]
|
|
|
|
get_query = f"SELECT * FROM test.s3_queue"
|
|
res1 = [
|
|
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
|
|
]
|
|
assert res1 == [total_values[1]]
|