ClickHouse/tests/integration/test_storage_s3_queue/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

923 lines
27 KiB
Python
Raw Normal View History

2023-05-02 14:27:53 +00:00
import io
2023-05-01 12:50:20 +00:00
import logging
import os
import random
import time
import pytest
2023-05-04 07:04:04 +00:00
from helpers.client import QueryRuntimeException
2023-05-01 12:50:20 +00:00
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
2023-05-10 15:17:26 +00:00
import json
"""
export CLICKHOUSE_TESTS_SERVER_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-server
export CLICKHOUSE_TESTS_CLIENT_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-client
export CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-odbc-bridge
export CLICKHOUSE_TESTS_BASE_CONFIG_DIR=/home/sergey/vkr/ClickHouse/programs/server
"""
2023-09-08 13:49:34 +00:00
MINIO_INTERNAL_PORT = 9001
AVAILABLE_MODES = ["unordered", "ordered"]
AUTH = "'minio','minio123',"
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
2023-05-10 15:17:26 +00:00
def prepare_s3_bucket(started_cluster):
# Allows read-write access for bucket without authorization.
bucket_read_write_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetBucketLocation",
"Resource": "arn:aws:s3:::root",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::root",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::root/*",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::root/*",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:DeleteObject",
"Resource": "arn:aws:s3:::root/*",
},
],
}
minio_client = started_cluster.minio_client
minio_client.set_bucket_policy(
started_cluster.minio_bucket, json.dumps(bucket_read_write_policy)
)
started_cluster.minio_restricted_bucket = "{}-with-auth".format(
started_cluster.minio_bucket
)
if minio_client.bucket_exists(started_cluster.minio_restricted_bucket):
minio_client.remove_bucket(started_cluster.minio_restricted_bucket)
minio_client.make_bucket(started_cluster.minio_restricted_bucket)
2023-05-01 12:50:20 +00:00
2023-05-02 14:27:53 +00:00
@pytest.fixture(autouse=True)
def s3_queue_setup_teardown(started_cluster):
instance = started_cluster.instances["instance"]
2023-05-08 07:26:52 +00:00
instance_2 = started_cluster.instances["instance2"]
2023-05-02 14:27:53 +00:00
instance.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
2023-05-08 07:26:52 +00:00
instance_2.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
2023-05-04 07:04:04 +00:00
minio = started_cluster.minio_client
objects = list(
minio.list_objects(started_cluster.minio_restricted_bucket, recursive=True)
)
for obj in objects:
minio.remove_object(started_cluster.minio_restricted_bucket, obj.object_name)
2023-05-02 14:27:53 +00:00
yield # run test
2023-05-01 12:50:20 +00:00
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"instance",
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
2023-09-08 13:49:34 +00:00
main_configs=[
"configs/defaultS3.xml",
"configs/named_collections.xml",
"configs/zookeeper.xml",
2023-10-25 10:30:02 +00:00
"configs/s3queue_log.xml",
2023-09-08 13:49:34 +00:00
],
2023-05-01 12:50:20 +00:00
)
2023-05-08 07:26:52 +00:00
cluster.add_instance(
"instance2",
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
2023-10-25 10:30:02 +00:00
main_configs=[
"configs/defaultS3.xml",
"configs/named_collections.xml",
"configs/s3queue_log.xml",
],
2023-05-08 07:26:52 +00:00
)
2023-05-01 12:50:20 +00:00
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
prepare_s3_bucket(cluster)
yield cluster
finally:
cluster.shutdown()
def run_query(instance, query, stdin=None, settings=None):
# type: (ClickHouseInstance, str, object, dict) -> str
logging.info("Running query '{}'...".format(query))
result = instance.query(query, stdin=stdin, settings=settings)
logging.info("Query finished")
return result
2023-09-08 13:49:34 +00:00
def generate_random_files(
started_cluster, files_path, count, column_num=3, row_num=10, start_ind=0
):
files = [
(f"{files_path}/test_{i}.csv", i) for i in range(start_ind, start_ind + count)
]
files.sort(key=lambda x: x[0])
2023-09-07 12:37:24 +00:00
2023-09-08 13:49:34 +00:00
print(f"Generating files: {files}")
2023-05-10 15:17:26 +00:00
2023-09-08 13:49:34 +00:00
total_values = []
for filename, i in files:
rand_values = [
[random.randint(0, 1000) for _ in range(column_num)] for _ in range(row_num)
]
total_values += rand_values
values_csv = (
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
).encode()
put_s3_file_content(started_cluster, filename, values_csv)
return total_values
def put_s3_file_content(started_cluster, filename, data):
buf = io.BytesIO(data)
started_cluster.minio_client.put_object(
started_cluster.minio_bucket, filename, buf, len(data)
2023-09-07 12:37:24 +00:00
)
2023-09-08 13:49:34 +00:00
def get_s3_file_content(started_cluster, bucket, filename, decode=True):
# type: (ClickHouseCluster, str, str, bool) -> str
# Returns content of given S3 file as string.
data = started_cluster.minio_client.get_object(bucket, filename)
data_str = b""
for chunk in data.stream():
data_str += chunk
if decode:
return data_str.decode()
return data_str
def create_table(
started_cluster,
node,
table_name,
mode,
files_path,
format="column1 UInt32, column2 UInt32, column3 UInt32",
additional_settings={},
2023-09-14 16:41:31 +00:00
file_format="CSV",
2023-09-08 13:49:34 +00:00
):
settings = {
"s3queue_loading_retries": 0,
"after_processing": "keep",
"keeper_path": f"/clickhouse/test_{table_name}",
"mode": f"{mode}",
}
settings.update(additional_settings)
url = f"http://{started_cluster.minio_host}:{started_cluster.minio_port}/{started_cluster.minio_bucket}/{files_path}/"
node.query(f"DROP TABLE IF EXISTS {table_name}")
create_query = f"""
CREATE TABLE {table_name} ({format})
2023-09-14 16:41:31 +00:00
ENGINE = S3Queue('{url}', {AUTH}'{file_format}')
2023-09-08 13:49:34 +00:00
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
"""
node.query(create_query)
def create_mv(
node,
src_table_name,
dst_table_name,
format="column1 UInt32, column2 UInt32, column3 UInt32",
):
mv_name = f"{dst_table_name}_mv"
2023-09-07 12:37:24 +00:00
node.query(
2023-05-10 15:17:26 +00:00
f"""
2023-09-07 12:37:24 +00:00
DROP TABLE IF EXISTS {dst_table_name};
DROP TABLE IF EXISTS {mv_name};
2023-09-08 13:49:34 +00:00
CREATE TABLE {dst_table_name} ({format}, _path String)
2023-09-07 12:37:24 +00:00
ENGINE = MergeTree()
ORDER BY column1;
2023-09-08 13:49:34 +00:00
CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT *, _path FROM {src_table_name};
2023-05-10 15:17:26 +00:00
"""
)
2023-09-08 13:49:34 +00:00
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_delete_after_processing(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"test.delete_after_processing_{mode}"
dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data"
files_num = 5
row_num = 10
total_values = generate_random_files(
started_cluster, files_path, files_num, row_num=row_num
)
create_table(
started_cluster,
node,
table_name,
mode,
files_path,
additional_settings={"after_processing": "delete"},
)
create_mv(node, table_name, dst_table_name)
2023-09-07 12:37:24 +00:00
expected_count = files_num * row_num
for _ in range(100):
count = int(node.query(f"SELECT count() FROM {dst_table_name}"))
print(f"{count}/{expected_count}")
if count == expected_count:
break
time.sleep(1)
assert int(node.query(f"SELECT count() FROM {dst_table_name}")) == expected_count
assert int(node.query(f"SELECT uniq(_path) FROM {dst_table_name}")) == files_num
2023-05-10 15:17:26 +00:00
assert [
2023-09-07 12:37:24 +00:00
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name} ORDER BY column1, column2, column3"
).splitlines()
2023-08-23 09:48:08 +00:00
] == sorted(total_values, key=lambda x: (x[0], x[1], x[2]))
2023-09-07 12:37:24 +00:00
2023-05-10 15:17:26 +00:00
minio = started_cluster.minio_client
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
assert len(objects) == 0
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_failed_retry(started_cluster, mode):
2023-09-08 13:49:34 +00:00
node = started_cluster.instances["instance"]
table_name = f"test.failed_retry_{mode}"
dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data"
file_path = f"{files_path}/trash_test.csv"
keeper_path = f"/clickhouse/test_{table_name}"
retries_num = 3
2023-05-10 15:17:26 +00:00
values = [
["failed", 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
2023-09-08 13:49:34 +00:00
put_s3_file_content(started_cluster, file_path, values_csv)
create_table(
started_cluster,
node,
table_name,
mode,
files_path,
additional_settings={
"s3queue_loading_retries": retries_num,
"keeper_path": keeper_path,
},
2023-05-10 15:17:26 +00:00
)
2023-09-08 13:49:34 +00:00
create_mv(node, table_name, dst_table_name)
failed_node_path = ""
for _ in range(20):
zk = started_cluster.get_kazoo_client("zoo1")
failed_nodes = zk.get_children(f"{keeper_path}/failed/")
if len(failed_nodes) > 0:
assert len(failed_nodes) == 1
failed_node_path = f"{keeper_path}/failed/{failed_nodes[0]}"
time.sleep(1)
2023-05-10 15:17:26 +00:00
2023-09-08 13:49:34 +00:00
assert failed_node_path != ""
2023-05-10 15:17:26 +00:00
2023-09-08 13:49:34 +00:00
retries = 0
for _ in range(20):
data, stat = zk.get(failed_node_path)
json_data = json.loads(data)
print(f"Failed node metadata: {json_data}")
assert json_data["file_path"] == file_path
retries = int(json_data["retries"])
if retries == retries_num:
break
time.sleep(1)
2023-05-10 15:17:26 +00:00
2023-09-08 13:49:34 +00:00
assert retries == retries_num
assert 0 == int(node.query(f"SELECT count() FROM {dst_table_name}"))
2023-05-10 15:17:26 +00:00
2023-05-02 14:27:53 +00:00
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_direct_select_file(started_cluster, mode):
2023-09-08 13:49:34 +00:00
node = started_cluster.instances["instance"]
table_name = f"test.direct_select_file_{mode}"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
file_path = f"{files_path}/test.csv"
2023-05-01 12:50:20 +00:00
values = [
[12549, 2463, 19893],
[64021, 38652, 66703],
[81611, 39650, 83516],
]
values_csv = (
2023-05-02 14:27:53 +00:00
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
2023-05-01 12:50:20 +00:00
).encode()
2023-09-08 13:49:34 +00:00
put_s3_file_content(started_cluster, file_path, values_csv)
for i in range(3):
create_table(
started_cluster,
node,
f"{table_name}_{i + 1}",
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
2023-05-01 12:50:20 +00:00
assert [
2023-09-08 13:49:34 +00:00
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_1").splitlines()
2023-05-02 14:27:53 +00:00
] == values
2023-05-01 12:50:20 +00:00
2023-09-08 13:49:34 +00:00
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_2").splitlines()
] == []
2023-05-02 14:27:53 +00:00
assert [
2023-09-08 13:49:34 +00:00
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_3").splitlines()
2023-05-02 14:27:53 +00:00
] == []
2023-09-08 13:49:34 +00:00
2023-05-02 14:27:53 +00:00
# New table with same zookeeper path
2023-09-08 13:49:34 +00:00
create_table(
started_cluster,
node,
f"{table_name}_4",
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
2023-05-02 14:27:53 +00:00
assert [
2023-09-08 13:49:34 +00:00
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_4").splitlines()
2023-05-02 14:27:53 +00:00
] == []
2023-09-08 13:49:34 +00:00
2023-05-02 14:27:53 +00:00
# New table with different zookeeper path
2023-09-08 13:49:34 +00:00
keeper_path = f"/clickhouse/test_{table_name}_{mode}_2"
create_table(
started_cluster,
node,
f"{table_name}_4",
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
},
2023-05-02 14:27:53 +00:00
)
2023-09-08 13:49:34 +00:00
2023-05-01 12:50:20 +00:00
assert [
2023-09-08 13:49:34 +00:00
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_4").splitlines()
2023-05-02 14:27:53 +00:00
] == values
values = [
[1, 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
2023-09-08 13:49:34 +00:00
file_path = f"{files_path}/t.csv"
put_s3_file_content(started_cluster, file_path, values_csv)
2023-05-02 14:27:53 +00:00
if mode == "unordered":
assert [
list(map(int, l.split()))
2023-09-08 13:49:34 +00:00
for l in node.query(f"SELECT * FROM {table_name}_4").splitlines()
2023-05-02 14:27:53 +00:00
] == values
elif mode == "ordered":
assert [
list(map(int, l.split()))
2023-09-08 13:49:34 +00:00
for l in node.query(f"SELECT * FROM {table_name}_4").splitlines()
2023-05-02 14:27:53 +00:00
] == []
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_direct_select_multiple_files(started_cluster, mode):
2023-09-08 13:49:34 +00:00
node = started_cluster.instances["instance"]
table_name = f"direct_select_multiple_files_{mode}"
files_path = f"{table_name}_data"
2023-05-02 14:27:53 +00:00
2023-09-08 13:49:34 +00:00
create_table(started_cluster, node, table_name, mode, files_path)
2023-05-04 07:04:04 +00:00
for i in range(5):
2023-05-02 14:27:53 +00:00
rand_values = [[random.randint(0, 50) for _ in range(3)] for _ in range(10)]
values_csv = (
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
).encode()
2023-09-08 13:49:34 +00:00
file_path = f"{files_path}/test_{i}.csv"
put_s3_file_content(started_cluster, file_path, values_csv)
2023-05-02 14:27:53 +00:00
assert [
list(map(int, l.split()))
2023-09-08 13:49:34 +00:00
for l in node.query(f"SELECT * FROM {table_name}").splitlines()
2023-05-02 14:27:53 +00:00
] == rand_values
2023-09-08 13:49:34 +00:00
total_values = generate_random_files(started_cluster, files_path, 4, start_ind=5)
2023-05-02 14:27:53 +00:00
assert {
2023-09-08 13:49:34 +00:00
tuple(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}").splitlines()
2023-05-02 14:27:53 +00:00
} == set([tuple(i) for i in total_values])
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_streaming_to_view_(started_cluster, mode):
2023-09-08 13:49:34 +00:00
node = started_cluster.instances["instance"]
table_name = f"streaming_to_view_{mode}"
dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data"
2023-07-21 15:03:30 +00:00
2023-09-08 13:49:34 +00:00
total_values = generate_random_files(started_cluster, files_path, 10)
create_table(started_cluster, node, table_name, mode, files_path)
create_mv(node, table_name, dst_table_name)
2023-05-02 14:27:53 +00:00
expected_values = set([tuple(i) for i in total_values])
for i in range(10):
selected_values = {
tuple(map(int, l.split()))
2023-09-08 13:49:34 +00:00
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}"
).splitlines()
2023-05-02 14:27:53 +00:00
}
2023-09-08 13:49:34 +00:00
if selected_values == expected_values:
2023-05-02 14:27:53 +00:00
break
2023-09-08 13:49:34 +00:00
time.sleep(1)
2023-05-02 14:27:53 +00:00
assert selected_values == expected_values
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_streaming_to_many_views(started_cluster, mode):
2023-09-08 13:49:34 +00:00
node = started_cluster.instances["instance"]
table_name = f"streaming_to_many_views_{mode}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
for i in range(3):
table = f"{table_name}_{i + 1}"
create_table(
started_cluster,
node,
table,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
create_mv(node, table, dst_table_name)
2023-07-21 15:03:30 +00:00
2023-09-08 13:49:34 +00:00
total_values = generate_random_files(started_cluster, files_path, 5)
2023-05-02 14:27:53 +00:00
expected_values = set([tuple(i) for i in total_values])
2023-09-08 13:49:34 +00:00
def select():
return {
tuple(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}"
).splitlines()
}
for _ in range(20):
if select() == expected_values:
2023-05-02 14:27:53 +00:00
break
2023-09-08 13:49:34 +00:00
time.sleep(1)
assert select() == expected_values
2023-05-04 07:04:04 +00:00
def test_multiple_tables_meta_mismatch(started_cluster):
2023-09-14 16:41:31 +00:00
node = started_cluster.instances["instance"]
table_name = f"multiple_tables_meta_mismatch"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
2023-07-21 15:03:30 +00:00
2023-09-14 16:41:31 +00:00
create_table(
started_cluster,
node,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
2023-05-04 07:04:04 +00:00
)
# check mode
failed = False
try:
2023-09-14 16:41:31 +00:00
create_table(
started_cluster,
node,
f"{table_name}_copy",
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
2023-05-04 07:04:04 +00:00
)
except QueryRuntimeException as e:
2023-09-27 18:10:05 +00:00
assert (
"Metadata with the same `s3queue_zookeeper_path` was already created but with different settings"
in str(e)
)
2023-05-04 07:04:04 +00:00
failed = True
2023-09-14 16:41:31 +00:00
2023-05-04 07:04:04 +00:00
assert failed is True
# check columns
try:
2023-09-14 16:41:31 +00:00
create_table(
started_cluster,
node,
f"{table_name}_copy",
"ordered",
files_path,
format="column1 UInt32, column2 UInt32, column3 UInt32, column4 UInt32",
additional_settings={
"keeper_path": keeper_path,
},
2023-05-04 07:04:04 +00:00
)
except QueryRuntimeException as e:
assert (
2023-05-04 08:58:09 +00:00
"Table columns structure in ZooKeeper is different from local table structure"
in str(e)
2023-05-04 07:04:04 +00:00
)
failed = True
assert failed is True
# check format
try:
2023-09-14 16:41:31 +00:00
create_table(
started_cluster,
node,
f"{table_name}_copy",
"ordered",
files_path,
format="column1 UInt32, column2 UInt32, column3 UInt32, column4 UInt32",
additional_settings={
"keeper_path": keeper_path,
},
file_format="TSV",
2023-05-04 07:04:04 +00:00
)
except QueryRuntimeException as e:
assert "Existing table metadata in ZooKeeper differs in format name" in str(e)
failed = True
2023-09-14 16:41:31 +00:00
2023-05-04 07:04:04 +00:00
assert failed is True
# create working engine
2023-09-14 16:41:31 +00:00
create_table(
started_cluster,
node,
f"{table_name}_copy",
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
2023-05-08 07:26:52 +00:00
)
2023-11-06 13:43:10 +00:00
# TODO: Update the modes for this test to include "ordered" once PR #55795 is finished.
@pytest.mark.parametrize("mode", ["unordered"])
2023-05-08 07:26:52 +00:00
def test_multiple_tables_streaming_sync(started_cluster, mode):
2023-09-14 16:41:31 +00:00
node = started_cluster.instances["instance"]
table_name = f"multiple_tables_streaming_sync_{mode}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
2023-05-08 07:26:52 +00:00
files_to_generate = 300
2023-09-14 16:41:31 +00:00
for i in range(3):
table = f"{table_name}_{i + 1}"
dst_table = f"{dst_table_name}_{i + 1}"
create_table(
started_cluster,
node,
table,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
create_mv(node, table, dst_table)
2023-05-08 07:26:52 +00:00
total_values = generate_random_files(
2023-09-14 16:41:31 +00:00
started_cluster, files_path, files_to_generate, row_num=1
2023-05-08 07:26:52 +00:00
)
2023-07-21 15:03:30 +00:00
def get_count(table_name):
2023-09-14 16:41:31 +00:00
return int(run_query(node, f"SELECT count() FROM {table_name}"))
2023-07-21 15:03:30 +00:00
for _ in range(100):
if (
2023-09-14 16:41:31 +00:00
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
2023-07-21 15:03:30 +00:00
) == files_to_generate:
break
time.sleep(1)
2023-05-08 07:26:52 +00:00
2023-10-25 10:30:02 +00:00
if (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
) != files_to_generate:
info = node.query(
2023-11-06 11:21:13 +00:00
f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical"
2023-10-25 10:30:02 +00:00
)
logging.debug(info)
assert False
2023-05-08 07:26:52 +00:00
res1 = [
2023-09-14 16:41:31 +00:00
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_1"
).splitlines()
2023-05-08 07:26:52 +00:00
]
res2 = [
list(map(int, l.split()))
2023-09-14 16:41:31 +00:00
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_2"
).splitlines()
2023-05-08 07:26:52 +00:00
]
res3 = [
list(map(int, l.split()))
2023-09-14 16:41:31 +00:00
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_3"
).splitlines()
2023-05-08 07:26:52 +00:00
]
assert {tuple(v) for v in res1 + res2 + res3} == set(
[tuple(i) for i in total_values]
)
2023-07-21 15:03:30 +00:00
# Checking that all files were processed only once
time.sleep(10)
assert (
2023-09-14 16:41:31 +00:00
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
2023-07-21 15:03:30 +00:00
) == files_to_generate
2023-05-08 07:26:52 +00:00
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
2023-09-14 16:41:31 +00:00
node = started_cluster.instances["instance"]
node_2 = started_cluster.instances["instance2"]
table_name = f"multiple_tables_streaming_sync_distributed_{mode}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
2023-11-20 13:55:23 +00:00
row_num = 50
total_rows = row_num * files_to_generate
2023-05-08 07:26:52 +00:00
2023-09-14 16:41:31 +00:00
for instance in [node, node_2]:
create_table(
started_cluster,
instance,
table_name,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
},
2023-08-02 15:09:47 +00:00
)
2023-07-21 15:03:30 +00:00
2023-09-14 16:41:31 +00:00
for instance in [node, node_2]:
create_mv(instance, table_name, dst_table_name)
2023-05-08 07:26:52 +00:00
total_values = generate_random_files(
2023-11-20 13:55:23 +00:00
started_cluster, files_path, files_to_generate, row_num=row_num
2023-05-08 07:26:52 +00:00
)
2023-07-30 11:27:01 +00:00
def get_count(node, table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
2023-08-02 15:09:47 +00:00
for _ in range(150):
2023-07-30 11:27:01 +00:00
if (
2023-09-14 16:41:31 +00:00
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
2023-11-20 13:55:23 +00:00
) == total_rows:
2023-07-30 11:27:01 +00:00
break
time.sleep(1)
2023-10-25 10:30:02 +00:00
if (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
2023-11-20 13:55:23 +00:00
) != total_rows:
2023-10-25 10:30:02 +00:00
info = node.query(
2023-11-06 11:21:13 +00:00
f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical"
2023-10-25 10:30:02 +00:00
)
logging.debug(info)
assert False
2023-09-14 16:41:31 +00:00
get_query = f"SELECT column1, column2, column3 FROM {dst_table_name}"
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
2023-05-08 07:26:52 +00:00
res2 = [
2023-09-14 16:41:31 +00:00
list(map(int, l.split())) for l in run_query(node_2, get_query).splitlines()
2023-05-08 07:26:52 +00:00
]
2023-11-20 13:55:23 +00:00
assert len(res1) + len(res2) == total_rows
2023-08-02 15:09:47 +00:00
2023-05-08 07:26:52 +00:00
# Checking that all engines have made progress
assert len(res1) > 0
assert len(res2) > 0
assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values])
2023-07-30 11:27:01 +00:00
# Checking that all files were processed only once
time.sleep(10)
assert (
2023-09-14 16:41:31 +00:00
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
2023-11-20 13:55:23 +00:00
) == total_rows
2023-07-30 11:27:01 +00:00
2023-05-08 07:26:52 +00:00
2023-09-14 16:41:31 +00:00
def test_max_set_age(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"max_set_age"
2023-09-15 12:21:08 +00:00
dst_table_name = f"{table_name}_dst"
2023-09-14 16:41:31 +00:00
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
2023-09-15 12:21:08 +00:00
max_age = 10
2023-09-14 16:41:31 +00:00
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_tracked_file_ttl_sec": max_age,
2023-09-25 11:44:45 +00:00
"s3queue_cleanup_interval_min_ms": 0,
"s3queue_cleanup_interval_max_ms": 0,
2023-09-14 16:41:31 +00:00
},
)
2023-09-15 12:21:08 +00:00
create_mv(node, table_name, dst_table_name)
2023-09-14 16:41:31 +00:00
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=1
)
2023-09-15 12:21:08 +00:00
expected_rows = 10
node.wait_for_log_line("Checking node limits")
node.wait_for_log_line("Node limits check finished")
def get_count():
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
assert 10 == int(node.query(f"SELECT uniq(_path) from {dst_table_name}"))
2023-09-14 16:41:31 +00:00
time.sleep(max_age + 1)
2023-09-15 12:21:08 +00:00
expected_rows = 20
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
assert 10 == int(node.query(f"SELECT uniq(_path) from {dst_table_name}"))
paths_count = [
int(x)
for x in node.query(
f"SELECT count() from {dst_table_name} GROUP BY _path"
).splitlines()
2023-09-14 16:41:31 +00:00
]
2023-09-15 12:21:08 +00:00
assert 10 == len(paths_count)
for path_count in paths_count:
assert 2 == path_count
2023-09-14 16:41:31 +00:00
2023-05-08 07:26:52 +00:00
def test_max_set_size(started_cluster):
2023-09-15 12:21:08 +00:00
node = started_cluster.instances["instance"]
table_name = f"max_set_size"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
max_age = 10
2023-05-08 07:26:52 +00:00
files_to_generate = 10
2023-09-15 12:21:08 +00:00
create_table(
started_cluster,
node,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_tracked_files_limit": 9,
2023-09-25 11:44:45 +00:00
"s3queue_cleanup_interval_min_ms": 0,
"s3queue_cleanup_interval_max_ms": 0,
2023-09-27 16:44:53 +00:00
"s3queue_processing_threads_num": 1,
2023-09-15 12:21:08 +00:00
},
2023-05-08 07:26:52 +00:00
)
total_values = generate_random_files(
2023-09-15 12:21:08 +00:00
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
2023-05-08 07:26:52 +00:00
)
2023-09-15 12:21:08 +00:00
2023-09-27 16:44:53 +00:00
get_query = f"SELECT * FROM {table_name} ORDER BY column1, column2, column3"
2023-09-15 12:21:08 +00:00
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
2023-09-27 16:44:53 +00:00
assert res1 == sorted(total_values, key=lambda x: (x[0], x[1], x[2]))
2023-09-15 12:21:08 +00:00
print(total_values)
2023-05-08 07:26:52 +00:00
2023-09-15 12:21:08 +00:00
time.sleep(10)
zk = started_cluster.get_kazoo_client("zoo1")
processed_nodes = zk.get_children(f"{keeper_path}/processed/")
assert len(processed_nodes) == 9
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
2023-05-08 07:26:52 +00:00
assert res1 == [total_values[0]]
2023-09-15 12:21:08 +00:00
time.sleep(10)
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
2023-05-08 07:26:52 +00:00
assert res1 == [total_values[1]]
2023-11-06 14:46:29 +00:00
def test_drop_table(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"test_drop"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
create_table(
started_cluster,
node,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
2023-11-08 11:29:40 +00:00
"s3queue_processing_threads_num": 5,
2023-11-06 14:46:29 +00:00
},
)
total_values = generate_random_files(
2023-11-08 11:29:40 +00:00
started_cluster, files_path, files_to_generate, start_ind=0, row_num=100000
2023-11-06 14:46:29 +00:00
)
create_mv(node, table_name, dst_table_name)
2023-11-09 12:04:46 +00:00
node.wait_for_log_line(f"Reading from file: test_drop_data")
2023-11-06 14:46:29 +00:00
node.query(f"DROP TABLE {table_name} SYNC")
2023-11-08 11:48:09 +00:00
assert node.contains_in_log(
f"StorageS3Queue ({table_name}): Table is being dropped"
)