ClickHouse/tests/integration/test_storage_s3_queue/test.py
2023-08-02 17:09:47 +02:00

880 lines
30 KiB
Python

import io
import logging
import os
import random
import time
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import json
"""
export CLICKHOUSE_TESTS_SERVER_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-server
export CLICKHOUSE_TESTS_CLIENT_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-client
export CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-odbc-bridge
export CLICKHOUSE_TESTS_BASE_CONFIG_DIR=/home/sergey/vkr/ClickHouse/programs/server
"""
def prepare_s3_bucket(started_cluster):
# Allows read-write access for bucket without authorization.
bucket_read_write_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetBucketLocation",
"Resource": "arn:aws:s3:::root",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::root",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::root/*",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::root/*",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:DeleteObject",
"Resource": "arn:aws:s3:::root/*",
},
],
}
minio_client = started_cluster.minio_client
minio_client.set_bucket_policy(
started_cluster.minio_bucket, json.dumps(bucket_read_write_policy)
)
started_cluster.minio_restricted_bucket = "{}-with-auth".format(
started_cluster.minio_bucket
)
if minio_client.bucket_exists(started_cluster.minio_restricted_bucket):
minio_client.remove_bucket(started_cluster.minio_restricted_bucket)
minio_client.make_bucket(started_cluster.minio_restricted_bucket)
@pytest.fixture(autouse=True)
def s3_queue_setup_teardown(started_cluster):
instance = started_cluster.instances["instance"]
instance_2 = started_cluster.instances["instance2"]
instance.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
instance_2.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
minio = started_cluster.minio_client
objects = list(
minio.list_objects(started_cluster.minio_restricted_bucket, recursive=True)
)
for obj in objects:
minio.remove_object(started_cluster.minio_restricted_bucket, obj.object_name)
yield # run test
MINIO_INTERNAL_PORT = 9001
AVAILABLE_MODES = ["unordered", "ordered"]
AUTH = "'minio','minio123',"
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
def put_s3_file_content(started_cluster, bucket, filename, data):
buf = io.BytesIO(data)
started_cluster.minio_client.put_object(bucket, filename, buf, len(data))
def generate_random_files(
count, prefix, cluster, bucket, column_num=3, row_num=10, start_ind=0
):
total_values = []
to_generate = [
(f"{prefix}/test_{i}.csv", i) for i in range(start_ind, start_ind + count)
]
to_generate.sort(key=lambda x: x[0])
for filename, i in to_generate:
rand_values = [
[random.randint(0, 50) for _ in range(column_num)] for _ in range(row_num)
]
total_values += rand_values
values_csv = (
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
).encode()
put_s3_file_content(cluster, bucket, filename, values_csv)
return total_values
# Returns content of given S3 file as string.
def get_s3_file_content(started_cluster, bucket, filename, decode=True):
# type: (ClickHouseCluster, str, str, bool) -> str
data = started_cluster.minio_client.get_object(bucket, filename)
data_str = b""
for chunk in data.stream():
data_str += chunk
if decode:
return data_str.decode()
return data_str
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"instance",
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
main_configs=["configs/defaultS3.xml", "configs/named_collections.xml"],
)
cluster.add_instance(
"instance2",
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
main_configs=["configs/defaultS3.xml", "configs/named_collections.xml"],
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
prepare_s3_bucket(cluster)
yield cluster
finally:
cluster.shutdown()
def run_query(instance, query, stdin=None, settings=None):
# type: (ClickHouseInstance, str, object, dict) -> str
logging.info("Running query '{}'...".format(query))
result = instance.query(query, stdin=stdin, settings=settings)
logging.info("Query finished")
return result
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_delete_after_processing(started_cluster, mode):
prefix = "delete"
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
total_values = generate_random_files(5, prefix, started_cluster, bucket)
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_delete_{mode}',
s3queue_loading_retries = 3,
after_processing='delete';
"""
)
get_query = f"SELECT * FROM test.s3_queue"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == total_values
minio = started_cluster.minio_client
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
assert len(objects) == 0
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_failed_retry(started_cluster, mode):
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = [
["failed", 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
filename = f"test.csv"
put_s3_file_content(started_cluster, bucket, filename, values_csv)
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/select_failed_retry_{mode}',
s3queue_loading_retries = 3;
"""
)
# first try
get_query = f"SELECT * FROM test.s3_queue"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
# second try
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
# upload correct file
values = [
[1, 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
put_s3_file_content(started_cluster, bucket, filename, values_csv)
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == values
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_direct_select_file(started_cluster, mode):
auth = "'minio','minio123',"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = [
[12549, 2463, 19893],
[64021, 38652, 66703],
[81611, 39650, 83516],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
filename = f"test.csv"
put_s3_file_content(started_cluster, bucket, filename, values_csv)
instance.query(
"""
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.s3_queue_2;
DROP TABLE IF EXISTS test.s3_queue_3;
"""
)
instance.query(
f"""
CREATE TABLE test.s3_queue ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {auth}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/select_{mode}'
"""
)
get_query = f"SELECT * FROM test.s3_queue"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == values
instance.query(
f"""
CREATE TABLE test.s3_queue_2 ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {auth}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/select_{mode}'
"""
)
get_query = f"SELECT * FROM test.s3_queue"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
# New table with same zookeeper path
get_query = f"SELECT * FROM test.s3_queue_2"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
# New table with different zookeeper path
instance.query(
f"""
CREATE TABLE test.s3_queue_3 ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {auth}'CSV')
SETTINGS
mode = '{mode}',
keeper_path='/clickhouse/select_{mode}_2'
"""
)
get_query = f"SELECT * FROM test.s3_queue_3"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == values
values = [
[1, 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
filename = f"t.csv"
put_s3_file_content(started_cluster, bucket, filename, values_csv)
get_query = f"SELECT * FROM test.s3_queue_3"
if mode == "unordered":
assert [
list(map(int, l.split()))
for l in run_query(instance, get_query).splitlines()
] == values
elif mode == "ordered":
assert [
list(map(int, l.split()))
for l in run_query(instance, get_query).splitlines()
] == []
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_direct_select_multiple_files(started_cluster, mode):
prefix = f"multiple_files_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query("drop table if exists test.s3_queue")
instance.query(
f"""
CREATE TABLE test.s3_queue ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/select_multiple_{mode}'
"""
)
for i in range(5):
rand_values = [[random.randint(0, 50) for _ in range(3)] for _ in range(10)]
values_csv = (
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
).encode()
filename = f"{prefix}/test_{i}.csv"
put_s3_file_content(started_cluster, bucket, filename, values_csv)
get_query = f"SELECT * FROM test.s3_queue"
assert [
list(map(int, l.split()))
for l in run_query(instance, get_query).splitlines()
] == rand_values
total_values = generate_random_files(
4, prefix, started_cluster, bucket, start_ind=5
)
get_query = f"SELECT * FROM test.s3_queue"
assert {
tuple(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
} == set([tuple(i) for i in total_values])
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_streaming_to_view_(started_cluster, mode):
prefix = f"streaming_files_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
total_values = generate_random_files(10, prefix, started_cluster, bucket)
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue_persistent;
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/view_{mode}';
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
FROM test.s3_queue;
"""
)
expected_values = set([tuple(i) for i in total_values])
for i in range(10):
get_query = f"SELECT * FROM test.persistent_s3_queue_mv"
selected_values = {
tuple(map(int, l.split()))
for l in run_query(instance, get_query).splitlines()
}
if selected_values != expected_values:
time.sleep(1)
else:
break
assert selected_values == expected_values
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_streaming_to_many_views(started_cluster, mode):
prefix = f"streaming_files_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
retry_cnt = 10
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue_persistent;
DROP TABLE IF EXISTS test.s3_queue_persistent_2;
DROP TABLE IF EXISTS test.s3_queue_persistent_3;
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_2;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_3;
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue_persistent_2 ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue_persistent_3 ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/multiple_view_{mode}';
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
FROM test.s3_queue;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_2 TO test.s3_queue_persistent_2 AS
SELECT
*
FROM test.s3_queue;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_3 TO test.s3_queue_persistent_3 AS
SELECT
*
FROM test.s3_queue;
"""
)
total_values = generate_random_files(5, prefix, started_cluster, bucket)
expected_values = set([tuple(i) for i in total_values])
for i in range(retry_cnt):
retry = False
for get_query in [
f"SELECT * FROM test.s3_queue_persistent",
f"SELECT * FROM test.s3_queue_persistent_2",
f"SELECT * FROM test.s3_queue_persistent_3",
]:
selected_values = {
tuple(map(int, l.split()))
for l in run_query(instance, get_query).splitlines()
}
if i == retry_cnt - 1:
assert selected_values == expected_values
if selected_values != expected_values:
retry = True
break
if retry:
time.sleep(1)
else:
break
def test_multiple_tables_meta_mismatch(started_cluster):
prefix = f"test_meta"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'ordered',
keeper_path = '/clickhouse/test_meta';
"""
)
# check mode
failed = False
try:
instance.query(
f"""
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'unordered',
keeper_path = '/clickhouse/test_meta';
"""
)
except QueryRuntimeException as e:
assert "Existing table metadata in ZooKeeper differs in engine mode" in str(e)
failed = True
assert failed is True
# check columns
table_format_copy = table_format + ", column4 UInt32"
try:
instance.query(
f"""
CREATE TABLE test.s3_queue_copy ({table_format_copy})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'ordered',
keeper_path = '/clickhouse/test_meta';
"""
)
except QueryRuntimeException as e:
assert (
"Table columns structure in ZooKeeper is different from local table structure"
in str(e)
)
failed = True
assert failed is True
# check format
try:
instance.query(
f"""
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'TSV')
SETTINGS
mode = 'ordered',
keeper_path = '/clickhouse/test_meta';
"""
)
except QueryRuntimeException as e:
assert "Existing table metadata in ZooKeeper differs in format name" in str(e)
failed = True
assert failed is True
# create working engine
instance.query(
f"""
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'ordered',
keeper_path = '/clickhouse/test_meta';
"""
)
def test_max_set_age(started_cluster):
files_to_generate = 10
max_age = 1
prefix = f"test_multiple"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'unordered',
keeper_path = '/clickhouse/test_set_age',
s3queue_tracked_files_limit = 10,
s3queue_tracked_file_ttl_sec = {max_age};
"""
)
total_values = generate_random_files(
files_to_generate, prefix, started_cluster, bucket, row_num=1
)
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == total_values
time.sleep(max_age + 1)
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == total_values
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_multiple_tables_streaming_sync(started_cluster, mode):
files_to_generate = 300
poll_size = 30
prefix = f"test_multiple_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.s3_queue_copy;
DROP TABLE IF EXISTS test.s3_queue_copy_2;
DROP TABLE IF EXISTS test.s3_queue_persistent;
DROP TABLE IF EXISTS test.s3_queue_persistent_copy;
DROP TABLE IF EXISTS test.s3_queue_persistent_copy_2;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_copy;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_copy_2;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
s3queue_polling_size = {poll_size};
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
s3queue_polling_size = {poll_size};
CREATE TABLE test.s3_queue_copy_2 ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
s3queue_polling_size = {poll_size};
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue_persistent_copy ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue_persistent_copy_2 ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
FROM test.s3_queue;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_copy TO test.s3_queue_persistent_copy AS
SELECT
*
FROM test.s3_queue_copy;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_copy_2 TO test.s3_queue_persistent_copy_2 AS
SELECT
*
FROM test.s3_queue_copy_2;
"""
)
total_values = generate_random_files(
files_to_generate, prefix, started_cluster, bucket, row_num=1
)
def get_count(table_name):
return int(run_query(instance, f"SELECT count() FROM {table_name}"))
for _ in range(100):
if (
get_count("test.s3_queue_persistent")
+ get_count("test.s3_queue_persistent_copy")
+ get_count("test.s3_queue_persistent_copy_2")
) == files_to_generate:
break
time.sleep(1)
get_query = f"SELECT * FROM test.s3_queue_persistent"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
get_query_copy = f"SELECT * FROM test.s3_queue_persistent_copy"
res2 = [
list(map(int, l.split()))
for l in run_query(instance, get_query_copy).splitlines()
]
get_query_copy_2 = f"SELECT * FROM test.s3_queue_persistent_copy_2"
res3 = [
list(map(int, l.split()))
for l in run_query(instance, get_query_copy_2).splitlines()
]
assert {tuple(v) for v in res1 + res2 + res3} == set(
[tuple(i) for i in total_values]
)
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count("test.s3_queue_persistent")
+ get_count("test.s3_queue_persistent_copy")
+ get_count("test.s3_queue_persistent_copy_2")
) == files_to_generate
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
files_to_generate = 100
poll_size = 2
prefix = f"test_multiple_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
instance_2 = started_cluster.instances["instance2"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
for inst in [instance, instance_2]:
inst.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.s3_queue_persistent;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_{mode}',
s3queue_polling_size = {poll_size};
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
"""
)
for inst in [instance, instance_2]:
inst.query(
f"""
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
FROM test.s3_queue;
"""
)
total_values = generate_random_files(
files_to_generate, prefix, started_cluster, bucket, row_num=1
)
def get_count(node, table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(150):
if (
get_count(instance, "test.s3_queue_persistent")
+ get_count(instance_2, "test.s3_queue_persistent")
) == files_to_generate:
break
time.sleep(1)
get_query = f"SELECT * FROM test.s3_queue_persistent"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
res2 = [
list(map(int, l.split())) for l in run_query(instance_2, get_query).splitlines()
]
assert len(res1) + len(res2) == files_to_generate
# Checking that all engines have made progress
assert len(res1) > 0
assert len(res2) > 0
assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values])
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count(instance, "test.s3_queue_persistent")
+ get_count(instance_2, "test.s3_queue_persistent")
) == files_to_generate
def test_max_set_size(started_cluster):
files_to_generate = 10
prefix = f"test_multiple"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'unordered',
keeper_path = '/clickhouse/test_set_size',
s3queue_tracked_files_limit = {files_to_generate - 1};
"""
)
total_values = generate_random_files(
files_to_generate, prefix, started_cluster, bucket, start_ind=0, row_num=1
)
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == total_values
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == [total_values[0]]
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == [total_values[1]]