ClickHouse/tests/integration/test_storage_s3_queue/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

880 lines
30 KiB
Python
Raw Normal View History

2023-05-02 14:27:53 +00:00
import io
2023-05-01 12:50:20 +00:00
import logging
import os
import random
import time
import pytest
2023-05-04 07:04:04 +00:00
from helpers.client import QueryRuntimeException
2023-05-01 12:50:20 +00:00
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
2023-05-10 15:17:26 +00:00
import json
"""
export CLICKHOUSE_TESTS_SERVER_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-server
export CLICKHOUSE_TESTS_CLIENT_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-client
export CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-odbc-bridge
export CLICKHOUSE_TESTS_BASE_CONFIG_DIR=/home/sergey/vkr/ClickHouse/programs/server
"""
def prepare_s3_bucket(started_cluster):
# Allows read-write access for bucket without authorization.
bucket_read_write_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetBucketLocation",
"Resource": "arn:aws:s3:::root",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::root",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::root/*",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::root/*",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:DeleteObject",
"Resource": "arn:aws:s3:::root/*",
},
],
}
minio_client = started_cluster.minio_client
minio_client.set_bucket_policy(
started_cluster.minio_bucket, json.dumps(bucket_read_write_policy)
)
started_cluster.minio_restricted_bucket = "{}-with-auth".format(
started_cluster.minio_bucket
)
if minio_client.bucket_exists(started_cluster.minio_restricted_bucket):
minio_client.remove_bucket(started_cluster.minio_restricted_bucket)
minio_client.make_bucket(started_cluster.minio_restricted_bucket)
2023-05-01 12:50:20 +00:00
2023-05-02 14:27:53 +00:00
@pytest.fixture(autouse=True)
def s3_queue_setup_teardown(started_cluster):
instance = started_cluster.instances["instance"]
2023-05-08 07:26:52 +00:00
instance_2 = started_cluster.instances["instance2"]
2023-05-02 14:27:53 +00:00
instance.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
2023-05-08 07:26:52 +00:00
instance_2.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
2023-05-04 07:04:04 +00:00
minio = started_cluster.minio_client
objects = list(
minio.list_objects(started_cluster.minio_restricted_bucket, recursive=True)
)
for obj in objects:
minio.remove_object(started_cluster.minio_restricted_bucket, obj.object_name)
2023-05-02 14:27:53 +00:00
yield # run test
2023-05-01 12:50:20 +00:00
MINIO_INTERNAL_PORT = 9001
2023-05-09 04:04:36 +00:00
AVAILABLE_MODES = ["unordered", "ordered"]
2023-05-02 14:27:53 +00:00
AUTH = "'minio','minio123',"
2023-05-01 12:50:20 +00:00
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
def put_s3_file_content(started_cluster, bucket, filename, data):
buf = io.BytesIO(data)
started_cluster.minio_client.put_object(bucket, filename, buf, len(data))
2023-05-02 14:27:53 +00:00
def generate_random_files(
count, prefix, cluster, bucket, column_num=3, row_num=10, start_ind=0
):
total_values = []
2023-05-08 07:26:52 +00:00
to_generate = [
(f"{prefix}/test_{i}.csv", i) for i in range(start_ind, start_ind + count)
]
to_generate.sort(key=lambda x: x[0])
for filename, i in to_generate:
2023-05-02 14:27:53 +00:00
rand_values = [
[random.randint(0, 50) for _ in range(column_num)] for _ in range(row_num)
]
total_values += rand_values
values_csv = (
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
).encode()
put_s3_file_content(cluster, bucket, filename, values_csv)
return total_values
2023-05-01 12:50:20 +00:00
# Returns content of given S3 file as string.
def get_s3_file_content(started_cluster, bucket, filename, decode=True):
# type: (ClickHouseCluster, str, str, bool) -> str
data = started_cluster.minio_client.get_object(bucket, filename)
data_str = b""
for chunk in data.stream():
data_str += chunk
if decode:
return data_str.decode()
return data_str
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"instance",
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
2023-05-02 14:27:53 +00:00
main_configs=["configs/defaultS3.xml", "configs/named_collections.xml"],
2023-05-01 12:50:20 +00:00
)
2023-05-08 07:26:52 +00:00
cluster.add_instance(
"instance2",
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
main_configs=["configs/defaultS3.xml", "configs/named_collections.xml"],
)
2023-05-01 12:50:20 +00:00
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
prepare_s3_bucket(cluster)
yield cluster
finally:
cluster.shutdown()
def run_query(instance, query, stdin=None, settings=None):
# type: (ClickHouseInstance, str, object, dict) -> str
logging.info("Running query '{}'...".format(query))
result = instance.query(query, stdin=stdin, settings=settings)
logging.info("Query finished")
return result
2023-05-10 15:17:26 +00:00
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_delete_after_processing(started_cluster, mode):
prefix = "delete"
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
total_values = generate_random_files(5, prefix, started_cluster, bucket)
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_delete_{mode}',
s3queue_loading_retries = 3,
after_processing='delete';
"""
)
get_query = f"SELECT * FROM test.s3_queue"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == total_values
minio = started_cluster.minio_client
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
assert len(objects) == 0
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_failed_retry(started_cluster, mode):
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = [
["failed", 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
filename = f"test.csv"
put_s3_file_content(started_cluster, bucket, filename, values_csv)
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/select_failed_retry_{mode}',
s3queue_loading_retries = 3;
"""
)
# first try
get_query = f"SELECT * FROM test.s3_queue"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
# second try
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
# upload correct file
values = [
[1, 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
put_s3_file_content(started_cluster, bucket, filename, values_csv)
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == values
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
2023-05-02 14:27:53 +00:00
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_direct_select_file(started_cluster, mode):
2023-05-01 12:50:20 +00:00
auth = "'minio','minio123',"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = [
[12549, 2463, 19893],
[64021, 38652, 66703],
[81611, 39650, 83516],
]
values_csv = (
2023-05-02 14:27:53 +00:00
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
2023-05-01 12:50:20 +00:00
).encode()
filename = f"test.csv"
put_s3_file_content(started_cluster, bucket, filename, values_csv)
2023-05-04 07:04:04 +00:00
instance.query(
"""
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.s3_queue_2;
DROP TABLE IF EXISTS test.s3_queue_3;
"""
)
2023-05-01 12:50:20 +00:00
instance.query(
2023-05-02 14:27:53 +00:00
f"""
CREATE TABLE test.s3_queue ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {auth}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/select_{mode}'
"""
2023-05-01 12:50:20 +00:00
)
2023-05-02 14:27:53 +00:00
get_query = f"SELECT * FROM test.s3_queue"
2023-05-01 12:50:20 +00:00
assert [
2023-05-02 14:27:53 +00:00
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == values
2023-05-01 12:50:20 +00:00
2023-05-02 14:27:53 +00:00
instance.query(
f"""
CREATE TABLE test.s3_queue_2 ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {auth}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/select_{mode}'
"""
)
get_query = f"SELECT * FROM test.s3_queue"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
# New table with same zookeeper path
get_query = f"SELECT * FROM test.s3_queue_2"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
# New table with different zookeeper path
instance.query(
f"""
CREATE TABLE test.s3_queue_3 ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {auth}'CSV')
SETTINGS
mode = '{mode}',
keeper_path='/clickhouse/select_{mode}_2'
"""
)
get_query = f"SELECT * FROM test.s3_queue_3"
2023-05-01 12:50:20 +00:00
assert [
2023-05-02 14:27:53 +00:00
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == values
values = [
[1, 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
filename = f"t.csv"
put_s3_file_content(started_cluster, bucket, filename, values_csv)
get_query = f"SELECT * FROM test.s3_queue_3"
if mode == "unordered":
assert [
list(map(int, l.split()))
for l in run_query(instance, get_query).splitlines()
] == values
elif mode == "ordered":
assert [
list(map(int, l.split()))
for l in run_query(instance, get_query).splitlines()
] == []
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_direct_select_multiple_files(started_cluster, mode):
prefix = f"multiple_files_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query("drop table if exists test.s3_queue")
instance.query(
f"""
CREATE TABLE test.s3_queue ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/select_multiple_{mode}'
"""
)
2023-05-04 07:04:04 +00:00
for i in range(5):
2023-05-02 14:27:53 +00:00
rand_values = [[random.randint(0, 50) for _ in range(3)] for _ in range(10)]
values_csv = (
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
).encode()
filename = f"{prefix}/test_{i}.csv"
put_s3_file_content(started_cluster, bucket, filename, values_csv)
get_query = f"SELECT * FROM test.s3_queue"
assert [
list(map(int, l.split()))
for l in run_query(instance, get_query).splitlines()
] == rand_values
total_values = generate_random_files(
2023-05-04 07:04:04 +00:00
4, prefix, started_cluster, bucket, start_ind=5
2023-05-02 14:27:53 +00:00
)
get_query = f"SELECT * FROM test.s3_queue"
assert {
tuple(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
} == set([tuple(i) for i in total_values])
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_streaming_to_view_(started_cluster, mode):
prefix = f"streaming_files_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
total_values = generate_random_files(10, prefix, started_cluster, bucket)
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue_persistent;
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
2023-07-21 15:03:30 +00:00
2023-05-02 14:27:53 +00:00
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
2023-05-04 07:04:04 +00:00
keeper_path = '/clickhouse/view_{mode}';
2023-05-02 14:27:53 +00:00
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
FROM test.s3_queue;
"""
)
expected_values = set([tuple(i) for i in total_values])
for i in range(10):
get_query = f"SELECT * FROM test.persistent_s3_queue_mv"
selected_values = {
tuple(map(int, l.split()))
for l in run_query(instance, get_query).splitlines()
}
if selected_values != expected_values:
time.sleep(1)
else:
break
assert selected_values == expected_values
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_streaming_to_many_views(started_cluster, mode):
prefix = f"streaming_files_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
retry_cnt = 10
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue_persistent;
DROP TABLE IF EXISTS test.s3_queue_persistent_2;
DROP TABLE IF EXISTS test.s3_queue_persistent_3;
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_2;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_3;
2023-07-21 15:03:30 +00:00
2023-05-02 14:27:53 +00:00
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
2023-07-21 15:03:30 +00:00
2023-05-02 14:27:53 +00:00
CREATE TABLE test.s3_queue_persistent_2 ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
2023-07-21 15:03:30 +00:00
2023-05-02 14:27:53 +00:00
CREATE TABLE test.s3_queue_persistent_3 ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
2023-05-04 07:04:04 +00:00
keeper_path = '/clickhouse/multiple_view_{mode}';
2023-05-02 14:27:53 +00:00
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
FROM test.s3_queue;
2023-07-21 15:03:30 +00:00
2023-05-02 14:27:53 +00:00
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_2 TO test.s3_queue_persistent_2 AS
SELECT
*
FROM test.s3_queue;
2023-07-21 15:03:30 +00:00
2023-05-02 14:27:53 +00:00
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_3 TO test.s3_queue_persistent_3 AS
SELECT
*
FROM test.s3_queue;
"""
)
2023-05-04 07:04:04 +00:00
total_values = generate_random_files(5, prefix, started_cluster, bucket)
2023-05-02 14:27:53 +00:00
expected_values = set([tuple(i) for i in total_values])
for i in range(retry_cnt):
retry = False
for get_query in [
f"SELECT * FROM test.s3_queue_persistent",
f"SELECT * FROM test.s3_queue_persistent_2",
f"SELECT * FROM test.s3_queue_persistent_3",
]:
selected_values = {
tuple(map(int, l.split()))
for l in run_query(instance, get_query).splitlines()
}
if i == retry_cnt - 1:
assert selected_values == expected_values
if selected_values != expected_values:
retry = True
break
if retry:
time.sleep(1)
else:
break
2023-05-04 07:04:04 +00:00
def test_multiple_tables_meta_mismatch(started_cluster):
prefix = f"test_meta"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
2023-07-21 15:03:30 +00:00
2023-05-04 07:04:04 +00:00
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'ordered',
keeper_path = '/clickhouse/test_meta';
"""
)
# check mode
failed = False
try:
instance.query(
f"""
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'unordered',
keeper_path = '/clickhouse/test_meta';
"""
)
except QueryRuntimeException as e:
assert "Existing table metadata in ZooKeeper differs in engine mode" in str(e)
failed = True
assert failed is True
# check columns
table_format_copy = table_format + ", column4 UInt32"
try:
instance.query(
f"""
CREATE TABLE test.s3_queue_copy ({table_format_copy})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'ordered',
keeper_path = '/clickhouse/test_meta';
"""
)
except QueryRuntimeException as e:
assert (
2023-05-04 08:58:09 +00:00
"Table columns structure in ZooKeeper is different from local table structure"
in str(e)
2023-05-04 07:04:04 +00:00
)
failed = True
assert failed is True
# check format
try:
instance.query(
f"""
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'TSV')
SETTINGS
mode = 'ordered',
keeper_path = '/clickhouse/test_meta';
"""
)
except QueryRuntimeException as e:
assert "Existing table metadata in ZooKeeper differs in format name" in str(e)
failed = True
assert failed is True
# create working engine
instance.query(
f"""
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'ordered',
keeper_path = '/clickhouse/test_meta';
"""
)
2023-05-08 07:26:52 +00:00
def test_max_set_age(started_cluster):
files_to_generate = 10
max_age = 1
prefix = f"test_multiple"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
2023-07-21 15:03:30 +00:00
2023-05-08 07:26:52 +00:00
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'unordered',
keeper_path = '/clickhouse/test_set_age',
2023-05-25 20:29:16 +00:00
s3queue_tracked_files_limit = 10,
s3queue_tracked_file_ttl_sec = {max_age};
2023-05-08 07:26:52 +00:00
"""
)
total_values = generate_random_files(
files_to_generate, prefix, started_cluster, bucket, row_num=1
)
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == total_values
time.sleep(max_age + 1)
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == total_values
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_multiple_tables_streaming_sync(started_cluster, mode):
files_to_generate = 300
poll_size = 30
prefix = f"test_multiple_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.s3_queue_copy;
DROP TABLE IF EXISTS test.s3_queue_copy_2;
DROP TABLE IF EXISTS test.s3_queue_persistent;
DROP TABLE IF EXISTS test.s3_queue_persistent_copy;
DROP TABLE IF EXISTS test.s3_queue_persistent_copy_2;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_copy;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_copy_2;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
s3queue_polling_size = {poll_size};
2023-07-21 15:03:30 +00:00
2023-05-08 07:26:52 +00:00
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
s3queue_polling_size = {poll_size};
2023-07-21 15:03:30 +00:00
2023-05-08 07:26:52 +00:00
CREATE TABLE test.s3_queue_copy_2 ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
s3queue_polling_size = {poll_size};
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
2023-07-21 15:03:30 +00:00
2023-05-08 07:26:52 +00:00
CREATE TABLE test.s3_queue_persistent_copy ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
2023-07-21 15:03:30 +00:00
2023-05-08 07:26:52 +00:00
CREATE TABLE test.s3_queue_persistent_copy_2 ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
2023-07-21 15:03:30 +00:00
2023-05-08 07:26:52 +00:00
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
FROM test.s3_queue;
2023-07-21 15:03:30 +00:00
2023-05-08 07:26:52 +00:00
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_copy TO test.s3_queue_persistent_copy AS
SELECT
*
FROM test.s3_queue_copy;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_copy_2 TO test.s3_queue_persistent_copy_2 AS
SELECT
*
FROM test.s3_queue_copy_2;
"""
)
total_values = generate_random_files(
files_to_generate, prefix, started_cluster, bucket, row_num=1
)
2023-07-21 15:03:30 +00:00
def get_count(table_name):
return int(run_query(instance, f"SELECT count() FROM {table_name}"))
for _ in range(100):
if (
get_count("test.s3_queue_persistent")
+ get_count("test.s3_queue_persistent_copy")
+ get_count("test.s3_queue_persistent_copy_2")
) == files_to_generate:
break
time.sleep(1)
2023-05-08 07:26:52 +00:00
get_query = f"SELECT * FROM test.s3_queue_persistent"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
get_query_copy = f"SELECT * FROM test.s3_queue_persistent_copy"
res2 = [
list(map(int, l.split()))
for l in run_query(instance, get_query_copy).splitlines()
]
get_query_copy_2 = f"SELECT * FROM test.s3_queue_persistent_copy_2"
res3 = [
list(map(int, l.split()))
for l in run_query(instance, get_query_copy_2).splitlines()
]
assert {tuple(v) for v in res1 + res2 + res3} == set(
[tuple(i) for i in total_values]
)
2023-07-21 15:03:30 +00:00
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count("test.s3_queue_persistent")
+ get_count("test.s3_queue_persistent_copy")
+ get_count("test.s3_queue_persistent_copy_2")
) == files_to_generate
2023-05-08 07:26:52 +00:00
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
files_to_generate = 100
2023-08-02 15:09:47 +00:00
poll_size = 2
2023-05-08 07:26:52 +00:00
prefix = f"test_multiple_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
instance_2 = started_cluster.instances["instance2"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
for inst in [instance, instance_2]:
inst.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.s3_queue_persistent;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_{mode}',
s3queue_polling_size = {poll_size};
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
2023-08-02 15:09:47 +00:00
"""
)
2023-07-21 15:03:30 +00:00
2023-08-02 15:09:47 +00:00
for inst in [instance, instance_2]:
inst.query(
f"""
2023-05-08 07:26:52 +00:00
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
FROM test.s3_queue;
"""
)
total_values = generate_random_files(
files_to_generate, prefix, started_cluster, bucket, row_num=1
)
2023-07-30 11:27:01 +00:00
def get_count(node, table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
2023-08-02 15:09:47 +00:00
for _ in range(150):
2023-07-30 11:27:01 +00:00
if (
get_count(instance, "test.s3_queue_persistent")
+ get_count(instance_2, "test.s3_queue_persistent")
) == files_to_generate:
break
time.sleep(1)
2023-05-08 07:26:52 +00:00
get_query = f"SELECT * FROM test.s3_queue_persistent"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
res2 = [
list(map(int, l.split())) for l in run_query(instance_2, get_query).splitlines()
]
2023-08-02 15:09:47 +00:00
assert len(res1) + len(res2) == files_to_generate
2023-05-08 07:26:52 +00:00
# Checking that all engines have made progress
assert len(res1) > 0
assert len(res2) > 0
assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values])
2023-07-30 11:27:01 +00:00
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count(instance, "test.s3_queue_persistent")
+ get_count(instance_2, "test.s3_queue_persistent")
) == files_to_generate
2023-05-08 07:26:52 +00:00
def test_max_set_size(started_cluster):
files_to_generate = 10
prefix = f"test_multiple"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
2023-07-21 15:03:30 +00:00
2023-05-08 07:26:52 +00:00
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'unordered',
keeper_path = '/clickhouse/test_set_size',
2023-05-25 20:29:16 +00:00
s3queue_tracked_files_limit = {files_to_generate - 1};
2023-05-08 07:26:52 +00:00
"""
)
total_values = generate_random_files(
files_to_generate, prefix, started_cluster, bucket, start_ind=0, row_num=1
)
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == total_values
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == [total_values[0]]
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == [total_values[1]]