ClickHouse/tests/integration/test_storage_s3_queue/test.py
Julia Kartseva d6ecabb41d Fix flaky test_storage_s3_queue/test.py::test_multiple_tables_streaming_sync_distributed
Disable parallel processing for the Ordered mode for the
test_storage_s3_queue/test.py::test_multiple_tables_streaming_sync_distributed
test.

The reason for this is that the load between the processing nodes is
too uneven when s3queue_processing_threads_num != 1,
e.g.:

```
$ grep res1 pytest.log
2024-08-07 07:15:58 [ 575 ] DEBUG : res1 size: 13300, res2 size: 1700, total_rows: 15000 (test.py:813, test_multiple_tables_streaming_sync_distributed)
```

In CIs environment, there are rare cases when one of the processors handles all the workload,
while the other is busy-waiting, and the test fails on assert:

When s3queue_processing_threads_num == 1, the workload is evenly
distributed:

```
$ grep res1 pytest.log
2024-08-07 07:26:52 [ 586 ] DEBUG : res1 size: 7200, res2 size: 7800, total_rows: 15000 (test.py:813, test_multiple_tables_streaming_sync_distributed)
```

This change only fixes test flakiness. Further investigation of the Order
mode parallelism is required.
2024-08-08 06:13:36 +00:00

1800 lines
54 KiB
Python

import io
import logging
import random
import time
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import json
from uuid import uuid4
AVAILABLE_MODES = ["unordered", "ordered"]
DEFAULT_AUTH = ["'minio'", "'minio123'"]
NO_AUTH = ["NOSIGN"]
AZURE_CONTAINER_NAME = "cont"
def prepare_public_s3_bucket(started_cluster):
def create_bucket(client, bucket_name, policy):
if client.bucket_exists(bucket_name):
client.remove_bucket(bucket_name)
client.make_bucket(bucket_name)
client.set_bucket_policy(bucket_name, json.dumps(policy))
def get_policy_with_public_access(bucket_name):
return {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": "*",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket",
],
"Resource": f"arn:aws:s3:::{bucket_name}",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": "*",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
],
"Resource": f"arn:aws:s3:::{bucket_name}/*",
},
],
}
minio_client = started_cluster.minio_client
started_cluster.minio_public_bucket = f"{started_cluster.minio_bucket}-public"
create_bucket(
minio_client,
started_cluster.minio_public_bucket,
get_policy_with_public_access(started_cluster.minio_public_bucket),
)
@pytest.fixture(autouse=True)
def s3_queue_setup_teardown(started_cluster):
instance = started_cluster.instances["instance"]
instance_2 = started_cluster.instances["instance2"]
instance.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
instance_2.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
minio = started_cluster.minio_client
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
for obj in objects:
minio.remove_object(started_cluster.minio_bucket, obj.object_name)
yield # run test
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"instance",
user_configs=["configs/users.xml"],
with_minio=True,
with_azurite=True,
with_zookeeper=True,
main_configs=[
"configs/zookeeper.xml",
"configs/s3queue_log.xml",
],
stay_alive=True,
)
cluster.add_instance(
"instance2",
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
main_configs=[
"configs/s3queue_log.xml",
],
stay_alive=True,
)
cluster.add_instance(
"old_instance",
with_zookeeper=True,
image="clickhouse/clickhouse-server",
tag="23.12",
stay_alive=True,
with_installed_binary=True,
use_old_analyzer=True,
)
cluster.add_instance(
"instance_too_many_parts",
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
main_configs=[
"configs/s3queue_log.xml",
"configs/merge_tree.xml",
],
stay_alive=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
container_client = cluster.blob_service_client.get_container_client(
AZURE_CONTAINER_NAME
)
container_client.create_container()
yield cluster
finally:
cluster.shutdown()
def run_query(instance, query, stdin=None, settings=None):
# type: (ClickHouseInstance, str, object, dict) -> str
logging.info("Running query '{}'...".format(query))
result = instance.query(query, stdin=stdin, settings=settings)
logging.info("Query finished")
return result
def generate_random_files(
started_cluster,
files_path,
count,
storage="s3",
column_num=3,
row_num=10,
start_ind=0,
bucket=None,
):
files = [
(f"{files_path}/test_{i}.csv", i) for i in range(start_ind, start_ind + count)
]
files.sort(key=lambda x: x[0])
print(f"Generating files: {files}")
total_values = []
for filename, i in files:
rand_values = [
[random.randint(0, 1000) for _ in range(column_num)] for _ in range(row_num)
]
total_values += rand_values
values_csv = (
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
).encode()
if storage == "s3":
put_s3_file_content(started_cluster, filename, values_csv, bucket)
else:
put_azure_file_content(started_cluster, filename, values_csv, bucket)
return total_values
def put_s3_file_content(started_cluster, filename, data, bucket=None):
bucket = started_cluster.minio_bucket if bucket is None else bucket
buf = io.BytesIO(data)
started_cluster.minio_client.put_object(bucket, filename, buf, len(data))
def put_azure_file_content(started_cluster, filename, data, bucket=None):
client = started_cluster.blob_service_client.get_blob_client(
AZURE_CONTAINER_NAME, filename
)
buf = io.BytesIO(data)
client.upload_blob(buf, "BlockBlob", len(data))
def create_table(
started_cluster,
node,
table_name,
mode,
files_path,
engine_name="S3Queue",
format="column1 UInt32, column2 UInt32, column3 UInt32",
additional_settings={},
file_format="CSV",
auth=DEFAULT_AUTH,
bucket=None,
expect_error=False,
):
auth_params = ",".join(auth)
bucket = started_cluster.minio_bucket if bucket is None else bucket
settings = {
"s3queue_loading_retries": 0,
"after_processing": "keep",
"keeper_path": f"/clickhouse/test_{table_name}",
"mode": f"{mode}",
}
settings.update(additional_settings)
engine_def = None
if engine_name == "S3Queue":
url = f"http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/"
engine_def = f"{engine_name}('{url}', {auth_params}, {file_format})"
else:
engine_def = f"{engine_name}('{started_cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', '{files_path}/', 'CSV')"
node.query(f"DROP TABLE IF EXISTS {table_name}")
create_query = f"""
CREATE TABLE {table_name} ({format})
ENGINE = {engine_def}
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
"""
if expect_error:
return node.query_and_get_error(create_query)
node.query(create_query)
def create_mv(
node,
src_table_name,
dst_table_name,
format="column1 UInt32, column2 UInt32, column3 UInt32",
):
mv_name = f"{dst_table_name}_mv"
node.query(
f"""
DROP TABLE IF EXISTS {dst_table_name};
DROP TABLE IF EXISTS {mv_name};
CREATE TABLE {dst_table_name} ({format}, _path String)
ENGINE = MergeTree()
ORDER BY column1;
CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT *, _path FROM {src_table_name};
"""
)
@pytest.mark.parametrize("mode", ["unordered", "ordered"])
@pytest.mark.parametrize("engine_name", ["S3Queue", "AzureQueue"])
def test_delete_after_processing(started_cluster, mode, engine_name):
node = started_cluster.instances["instance"]
table_name = f"test.delete_after_processing_{mode}_{engine_name}"
dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data"
files_num = 5
row_num = 10
if engine_name == "S3Queue":
storage = "s3"
else:
storage = "azure"
total_values = generate_random_files(
started_cluster, files_path, files_num, row_num=row_num, storage=storage
)
create_table(
started_cluster,
node,
table_name,
mode,
files_path,
additional_settings={"after_processing": "delete"},
engine_name=engine_name,
)
create_mv(node, table_name, dst_table_name)
expected_count = files_num * row_num
for _ in range(100):
count = int(node.query(f"SELECT count() FROM {dst_table_name}"))
print(f"{count}/{expected_count}")
if count == expected_count:
break
time.sleep(1)
assert int(node.query(f"SELECT count() FROM {dst_table_name}")) == expected_count
assert int(node.query(f"SELECT uniq(_path) FROM {dst_table_name}")) == files_num
assert [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name} ORDER BY column1, column2, column3"
).splitlines()
] == sorted(total_values, key=lambda x: (x[0], x[1], x[2]))
if engine_name == "S3Queue":
minio = started_cluster.minio_client
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
assert len(objects) == 0
else:
client = started_cluster.blob_service_client.get_container_client(
AZURE_CONTAINER_NAME
)
objects_iterator = client.list_blobs(files_path)
for objects in objects_iterator:
assert False
@pytest.mark.parametrize("mode", ["unordered", "ordered"])
@pytest.mark.parametrize("engine_name", ["S3Queue", "AzureQueue"])
def test_failed_retry(started_cluster, mode, engine_name):
node = started_cluster.instances["instance"]
table_name = f"test.failed_retry_{mode}_{engine_name}"
dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data"
file_path = f"{files_path}/trash_test.csv"
keeper_path = f"/clickhouse/test_{table_name}"
retries_num = 3
values = [
["failed", 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
if engine_name == "S3Queue":
put_s3_file_content(started_cluster, file_path, values_csv)
else:
put_azure_file_content(started_cluster, file_path, values_csv)
create_table(
started_cluster,
node,
table_name,
mode,
files_path,
additional_settings={
"s3queue_loading_retries": retries_num,
"keeper_path": keeper_path,
},
engine_name=engine_name,
)
create_mv(node, table_name, dst_table_name)
failed_node_path = ""
for _ in range(20):
zk = started_cluster.get_kazoo_client("zoo1")
failed_nodes = zk.get_children(f"{keeper_path}/failed/")
if len(failed_nodes) > 0:
assert len(failed_nodes) == 1
failed_node_path = f"{keeper_path}/failed/{failed_nodes[0]}"
time.sleep(1)
assert failed_node_path != ""
retries = 0
for _ in range(20):
data, stat = zk.get(failed_node_path)
json_data = json.loads(data)
print(f"Failed node metadata: {json_data}")
assert json_data["file_path"] == file_path
retries = int(json_data["retries"])
if retries == retries_num:
break
time.sleep(1)
assert retries == retries_num
assert 0 == int(node.query(f"SELECT count() FROM {dst_table_name}"))
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_direct_select_file(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"test.direct_select_file_{mode}"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
file_path = f"{files_path}/test.csv"
values = [
[12549, 2463, 19893],
[64021, 38652, 66703],
[81611, 39650, 83516],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
put_s3_file_content(started_cluster, file_path, values_csv)
for i in range(3):
create_table(
started_cluster,
node,
f"{table_name}_{i + 1}",
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 1,
},
)
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_1").splitlines()
] == values
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_2").splitlines()
] == []
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_3").splitlines()
] == []
# New table with same zookeeper path
create_table(
started_cluster,
node,
f"{table_name}_4",
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 1,
},
)
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_4").splitlines()
] == []
# New table with different zookeeper path
keeper_path = f"/clickhouse/test_{table_name}_{mode}_2"
create_table(
started_cluster,
node,
f"{table_name}_4",
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 1,
},
)
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_4").splitlines()
] == values
values = [
[1, 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
file_path = f"{files_path}/t.csv"
put_s3_file_content(started_cluster, file_path, values_csv)
if mode == "unordered":
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_4").splitlines()
] == values
elif mode == "ordered":
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_4").splitlines()
] == []
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_direct_select_multiple_files(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"direct_select_multiple_files_{mode}"
files_path = f"{table_name}_data"
create_table(started_cluster, node, table_name, mode, files_path)
for i in range(5):
rand_values = [[random.randint(0, 50) for _ in range(3)] for _ in range(10)]
values_csv = (
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
).encode()
file_path = f"{files_path}/test_{i}.csv"
put_s3_file_content(started_cluster, file_path, values_csv)
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}").splitlines()
] == rand_values
total_values = generate_random_files(started_cluster, files_path, 4, start_ind=5)
assert {
tuple(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}").splitlines()
} == set([tuple(i) for i in total_values])
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_streaming_to_view_(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"streaming_to_view_{mode}"
dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data"
total_values = generate_random_files(started_cluster, files_path, 10)
create_table(started_cluster, node, table_name, mode, files_path)
create_mv(node, table_name, dst_table_name)
expected_values = set([tuple(i) for i in total_values])
for i in range(10):
selected_values = {
tuple(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}"
).splitlines()
}
if selected_values == expected_values:
break
time.sleep(1)
assert selected_values == expected_values
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_streaming_to_many_views(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"streaming_to_many_views_{mode}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
for i in range(3):
table = f"{table_name}_{i + 1}"
create_table(
started_cluster,
node,
table,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
create_mv(node, table, dst_table_name)
total_values = generate_random_files(started_cluster, files_path, 5)
expected_values = set([tuple(i) for i in total_values])
def select():
return {
tuple(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}"
).splitlines()
}
for _ in range(20):
if select() == expected_values:
break
time.sleep(1)
assert select() == expected_values
def test_multiple_tables_meta_mismatch(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"multiple_tables_meta_mismatch"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
create_table(
started_cluster,
node,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
# check mode
failed = False
try:
create_table(
started_cluster,
node,
f"{table_name}_copy",
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
except QueryRuntimeException as e:
assert "Existing table metadata in ZooKeeper differs in engine mode" in str(e)
failed = True
assert failed is True
# check columns
try:
create_table(
started_cluster,
node,
f"{table_name}_copy",
"ordered",
files_path,
format="column1 UInt32, column2 UInt32, column3 UInt32, column4 UInt32",
additional_settings={
"keeper_path": keeper_path,
},
)
except QueryRuntimeException as e:
assert (
"Table columns structure in ZooKeeper is different from local table structure"
in str(e)
)
failed = True
assert failed is True
# check format
try:
create_table(
started_cluster,
node,
f"{table_name}_copy",
"ordered",
files_path,
format="column1 UInt32, column2 UInt32, column3 UInt32, column4 UInt32",
additional_settings={
"keeper_path": keeper_path,
},
file_format="TSV",
)
except QueryRuntimeException as e:
assert "Existing table metadata in ZooKeeper differs in format name" in str(e)
failed = True
assert failed is True
# create working engine
create_table(
started_cluster,
node,
f"{table_name}_copy",
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
# TODO: Update the modes for this test to include "ordered" once PR #55795 is finished.
@pytest.mark.parametrize("mode", ["unordered"])
def test_multiple_tables_streaming_sync(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"multiple_tables_streaming_sync_{mode}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
for i in range(3):
table = f"{table_name}_{i + 1}"
dst_table = f"{dst_table_name}_{i + 1}"
create_table(
started_cluster,
node,
table,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
create_mv(node, table, dst_table)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=1
)
def get_count(table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(100):
if (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
) == files_to_generate:
break
time.sleep(1)
if (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
) != files_to_generate:
info = node.query(
f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical"
)
logging.debug(info)
assert False
res1 = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_1"
).splitlines()
]
res2 = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_2"
).splitlines()
]
res3 = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_3"
).splitlines()
]
assert {tuple(v) for v in res1 + res2 + res3} == set(
[tuple(i) for i in total_values]
)
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
) == files_to_generate
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
node = started_cluster.instances["instance"]
node_2 = started_cluster.instances["instance2"]
table_name = f"multiple_tables_streaming_sync_distributed_{mode}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
row_num = 50
total_rows = row_num * files_to_generate
for instance in [node, node_2]:
create_table(
started_cluster,
instance,
table_name,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_buckets": 2,
**({"s3queue_processing_threads_num": 1} if mode == "ordered" else {}),
},
)
for instance in [node, node_2]:
create_mv(instance, table_name, dst_table_name)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=row_num
)
def get_count(node, table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(150):
if (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == total_rows:
break
time.sleep(1)
if (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) != total_rows:
info = node.query(
f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical"
)
logging.debug(info)
assert False
get_query = f"SELECT column1, column2, column3 FROM {dst_table_name}"
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
res2 = [
list(map(int, l.split())) for l in run_query(node_2, get_query).splitlines()
]
logging.debug(
f"res1 size: {len(res1)}, res2 size: {len(res2)}, total_rows: {total_rows}"
)
assert len(res1) + len(res2) == total_rows
# Checking that all engines have made progress
assert len(res1) > 0
assert len(res2) > 0
assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values])
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == total_rows
def test_max_set_age(started_cluster):
node = started_cluster.instances["instance"]
table_name = "max_set_age"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
max_age = 20
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"tracked_file_ttl_sec": max_age,
"cleanup_interval_min_ms": max_age / 3,
"cleanup_interval_max_ms": max_age / 3,
"loading_retries": 0,
"processing_threads_num": 1,
"loading_retries": 0,
},
)
create_mv(node, table_name, dst_table_name)
_ = generate_random_files(started_cluster, files_path, files_to_generate, row_num=1)
expected_rows = files_to_generate
node.wait_for_log_line("Checking node limits")
node.wait_for_log_line("Node limits check finished")
def get_count():
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
def wait_for_condition(check_function, max_wait_time=1.5 * max_age):
before = time.time()
while time.time() - before < max_wait_time:
if check_function():
return
time.sleep(0.25)
assert False
wait_for_condition(lambda: get_count() == expected_rows)
assert files_to_generate == int(
node.query(f"SELECT uniq(_path) from {dst_table_name}")
)
expected_rows *= 2
wait_for_condition(lambda: get_count() == expected_rows)
assert files_to_generate == int(
node.query(f"SELECT uniq(_path) from {dst_table_name}")
)
paths_count = [
int(x)
for x in node.query(
f"SELECT count() from {dst_table_name} GROUP BY _path"
).splitlines()
]
assert files_to_generate == len(paths_count)
for path_count in paths_count:
assert 2 == path_count
def get_object_storage_failures():
return int(
node.query(
"SELECT value FROM system.events WHERE name = 'ObjectStorageQueueFailedFiles' SETTINGS system_events_show_zero_values=1"
)
)
failed_count = get_object_storage_failures()
values = [
["failed", 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
# use a different filename for each test to allow running a bunch of them sequentially with --count
file_with_error = f"max_set_age_fail_{uuid4().hex[:8]}.csv"
put_s3_file_content(started_cluster, f"{files_path}/{file_with_error}", values_csv)
wait_for_condition(lambda: failed_count + 1 == get_object_storage_failures())
node.query("SYSTEM FLUSH LOGS")
assert "Cannot parse input" in node.query(
f"SELECT exception FROM system.s3queue WHERE file_name ilike '%{file_with_error}'"
)
assert 1 == int(
node.query(
f"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%{file_with_error}' AND notEmpty(exception)"
)
)
wait_for_condition(lambda: failed_count + 2 == get_object_storage_failures())
node.query("SYSTEM FLUSH LOGS")
assert "Cannot parse input" in node.query(
f"SELECT exception FROM system.s3queue WHERE file_name ilike '%{file_with_error}' ORDER BY processing_end_time DESC LIMIT 1"
)
assert 1 < int(
node.query(
f"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%{file_with_error}' AND notEmpty(exception)"
)
)
def test_max_set_size(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"max_set_size"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
max_age = 10
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_tracked_files_limit": 9,
"s3queue_cleanup_interval_min_ms": 0,
"s3queue_cleanup_interval_max_ms": 0,
"s3queue_processing_threads_num": 1,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
get_query = f"SELECT * FROM {table_name} ORDER BY column1, column2, column3"
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
assert res1 == sorted(total_values, key=lambda x: (x[0], x[1], x[2]))
print(total_values)
time.sleep(10)
zk = started_cluster.get_kazoo_client("zoo1")
processed_nodes = zk.get_children(f"{keeper_path}/processed/")
assert len(processed_nodes) == 9
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
assert res1 == [total_values[0]]
time.sleep(10)
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
assert res1 == [total_values[1]]
def test_drop_table(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"test_drop"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
create_table(
started_cluster,
node,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 5,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=100000
)
create_mv(node, table_name, dst_table_name)
node.wait_for_log_line(f"Reading from file: test_drop_data")
node.query(f"DROP TABLE {table_name} SYNC")
assert node.contains_in_log(
f"StorageS3Queue (default.{table_name}): Table is being dropped"
) or node.contains_in_log(
f"StorageS3Queue (default.{table_name}): Shutdown was called, stopping sync"
)
def test_s3_client_reused(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"test.test_s3_client_reused"
dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data"
row_num = 10
def get_created_s3_clients_count():
value = node.query(
f"SELECT value FROM system.events WHERE event='S3Clients'"
).strip()
return int(value) if value != "" else 0
def wait_all_processed(files_num):
expected_count = files_num * row_num
for _ in range(100):
count = int(node.query(f"SELECT count() FROM {dst_table_name}"))
print(f"{count}/{expected_count}")
if count == expected_count:
break
time.sleep(1)
assert (
int(node.query(f"SELECT count() FROM {dst_table_name}")) == expected_count
)
prepare_public_s3_bucket(started_cluster)
s3_clients_before = get_created_s3_clients_count()
create_table(
started_cluster,
node,
table_name,
"ordered",
files_path,
additional_settings={
"after_processing": "delete",
"s3queue_processing_threads_num": 1,
},
auth=NO_AUTH,
bucket=started_cluster.minio_public_bucket,
)
s3_clients_after = get_created_s3_clients_count()
assert s3_clients_before + 1 == s3_clients_after
create_mv(node, table_name, dst_table_name)
for i in range(0, 10):
s3_clients_before = get_created_s3_clients_count()
generate_random_files(
started_cluster,
files_path,
count=1,
start_ind=i,
row_num=row_num,
bucket=started_cluster.minio_public_bucket,
)
wait_all_processed(i + 1)
s3_clients_after = get_created_s3_clients_count()
assert s3_clients_before == s3_clients_after
def get_processed_files(node, table_name):
return (
node.query(
f"""
select splitByChar('/', file_name)[-1] as file
from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed' order by file
"""
)
.strip()
.split("\n")
)
def get_unprocessed_files(node, table_name):
return node.query(
f"""
select concat('test_', toString(number), '.csv') as file from numbers(300)
where file not
in (select splitByChar('/', file_name)[-1] from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed')
"""
)
@pytest.mark.parametrize("mode", ["unordered", "ordered"])
def test_processing_threads(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"processing_threads_{mode}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
processing_threads = 32
create_table(
started_cluster,
node,
table_name,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
},
)
create_mv(node, table_name, dst_table_name)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=1
)
def get_count(table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(50):
if (get_count(f"{dst_table_name}")) == files_to_generate:
break
time.sleep(1)
if get_count(dst_table_name) != files_to_generate:
processed_files = get_processed_files(node, table_name)
unprocessed_files = get_unprocessed_files(node, table_name)
logging.debug(
f"Processed files: {len(processed_files)}/{files_to_generate}, unprocessed files: {unprocessed_files}, count: {get_count(dst_table_name)}"
)
assert False
res = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}"
).splitlines()
]
assert {tuple(v) for v in res} == set([tuple(i) for i in total_values])
if mode == "ordered":
zk = started_cluster.get_kazoo_client("zoo1")
nodes = zk.get_children(f"{keeper_path}")
print(f"Metadata nodes: {nodes}")
processed_nodes = zk.get_children(f"{keeper_path}/buckets/")
assert len(processed_nodes) == processing_threads
@pytest.mark.parametrize(
"mode, processing_threads",
[
pytest.param("unordered", 1),
pytest.param("unordered", 8),
pytest.param("ordered", 1),
pytest.param("ordered", 8),
],
)
def test_shards(started_cluster, mode, processing_threads):
node = started_cluster.instances["instance"]
table_name = f"test_shards_{mode}_{processing_threads}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
shards_num = 3
for i in range(shards_num):
table = f"{table_name}_{i + 1}"
dst_table = f"{dst_table_name}_{i + 1}"
create_table(
started_cluster,
node,
table,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
"s3queue_buckets": shards_num,
},
)
create_mv(node, table, dst_table)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=1
)
def get_count(table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(30):
count = (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
)
if count == files_to_generate:
break
print(f"Current {count}/{files_to_generate}")
time.sleep(1)
if (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
) != files_to_generate:
processed_files = (
node.query(
f"""
select splitByChar('/', file_name)[-1] as file from system.s3queue
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 order by file
"""
)
.strip()
.split("\n")
)
logging.debug(
f"Processed files: {len(processed_files)}/{files_to_generate}: {processed_files}"
)
count = (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
)
logging.debug(f"Processed rows: {count}/{files_to_generate}")
info = node.query(
f"""
select concat('test_', toString(number), '.csv') as file from numbers(300)
where file not in (select splitByChar('/', file_name)[-1] from system.s3queue
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0)
"""
)
logging.debug(f"Unprocessed files: {info}")
assert False
res1 = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_1"
).splitlines()
]
res2 = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_2"
).splitlines()
]
res3 = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_3"
).splitlines()
]
assert {tuple(v) for v in res1 + res2 + res3} == set(
[tuple(i) for i in total_values]
)
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
) == files_to_generate
if mode == "ordered":
zk = started_cluster.get_kazoo_client("zoo1")
processed_nodes = zk.get_children(f"{keeper_path}/buckets/")
assert len(processed_nodes) == shards_num
@pytest.mark.parametrize(
"mode, processing_threads",
[
pytest.param("unordered", 1),
pytest.param("unordered", 8),
pytest.param("ordered", 1),
pytest.param("ordered", 8),
],
)
def test_shards_distributed(started_cluster, mode, processing_threads):
node = started_cluster.instances["instance"]
node_2 = started_cluster.instances["instance2"]
table_name = f"test_shards_distributed_{mode}_{processing_threads}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
row_num = 50
total_rows = row_num * files_to_generate
shards_num = 2
i = 0
for instance in [node, node_2]:
create_table(
started_cluster,
instance,
table_name,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
"s3queue_buckets": shards_num,
},
)
i += 1
for instance in [node, node_2]:
create_mv(instance, table_name, dst_table_name)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=row_num
)
def get_count(node, table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
def print_debug_info():
processed_files = (
node.query(
f"""
select splitByChar('/', file_name)[-1] as file from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 order by file
"""
)
.strip()
.split("\n")
)
logging.debug(
f"Processed files by node 1: {len(processed_files)}/{files_to_generate}"
)
processed_files = (
node_2.query(
f"""
select splitByChar('/', file_name)[-1] as file from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 order by file
"""
)
.strip()
.split("\n")
)
logging.debug(
f"Processed files by node 2: {len(processed_files)}/{files_to_generate}"
)
count = get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
logging.debug(f"Processed rows: {count}/{total_rows}")
info = node.query(
f"""
select concat('test_', toString(number), '.csv') as file from numbers(300)
where file not in (select splitByChar('/', file_name)[-1] from clusterAllReplicas(default, system.s3queue)
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0)
"""
)
logging.debug(f"Unprocessed files: {info}")
files1 = (
node.query(
f"""
select splitByChar('/', file_name)[-1] from system.s3queue
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0
"""
)
.strip()
.split("\n")
)
files2 = (
node_2.query(
f"""
select splitByChar('/', file_name)[-1] from system.s3queue
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0
"""
)
.strip()
.split("\n")
)
def intersection(list_a, list_b):
return [e for e in list_a if e in list_b]
logging.debug(f"Intersecting files: {intersection(files1, files2)}")
for _ in range(30):
if (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == total_rows:
break
time.sleep(1)
if (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) != total_rows:
print_debug_info()
assert False
get_query = f"SELECT column1, column2, column3 FROM {dst_table_name}"
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
res2 = [
list(map(int, l.split())) for l in run_query(node_2, get_query).splitlines()
]
if len(res1) + len(res2) != total_rows or len(res1) <= 0 or len(res2) <= 0 or True:
logging.debug(
f"res1 size: {len(res1)}, res2 size: {len(res2)}, total_rows: {total_rows}"
)
print_debug_info()
assert len(res1) + len(res2) == total_rows
# Checking that all engines have made progress
assert len(res1) > 0
assert len(res2) > 0
assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values])
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == total_rows
if mode == "ordered":
zk = started_cluster.get_kazoo_client("zoo1")
processed_nodes = zk.get_children(f"{keeper_path}/buckets/")
assert len(processed_nodes) == shards_num
node.restart_clickhouse()
time.sleep(10)
assert (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == total_rows
def test_settings_check(started_cluster):
node = started_cluster.instances["instance"]
node_2 = started_cluster.instances["instance2"]
table_name = f"test_settings_check"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
mode = "ordered"
create_table(
started_cluster,
node,
table_name,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 5,
"s3queue_buckets": 2,
},
)
assert (
"Existing table metadata in ZooKeeper differs in buckets setting. Stored in ZooKeeper: 2, local: 3"
in create_table(
started_cluster,
node_2,
table_name,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 5,
"s3queue_buckets": 3,
},
expect_error=True,
)
)
node.query(f"DROP TABLE {table_name} SYNC")
@pytest.mark.parametrize("processing_threads", [1, 5])
def test_processed_file_setting(started_cluster, processing_threads):
node = started_cluster.instances["instance"]
table_name = f"test_processed_file_setting_{processing_threads}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}_{processing_threads}"
files_path = f"{table_name}_data"
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
"s3queue_last_processed_path": f"{files_path}/test_5.csv",
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
create_mv(node, table_name, dst_table_name)
def get_count():
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
expected_rows = 4
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
node.restart_clickhouse()
time.sleep(10)
expected_rows = 4
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
@pytest.mark.parametrize("processing_threads", [1, 5])
def test_processed_file_setting_distributed(started_cluster, processing_threads):
node = started_cluster.instances["instance"]
node_2 = started_cluster.instances["instance2"]
table_name = f"test_processed_file_setting_distributed_{processing_threads}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 10
for instance in [node, node_2]:
create_table(
started_cluster,
instance,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
"s3queue_last_processed_path": f"{files_path}/test_5.csv",
"s3queue_buckets": 2,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
for instance in [node, node_2]:
create_mv(instance, table_name, dst_table_name)
def get_count():
query = f"SELECT count() FROM {dst_table_name}"
return int(node.query(query)) + int(node_2.query(query))
expected_rows = 4
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
for instance in [node, node_2]:
instance.restart_clickhouse()
time.sleep(10)
expected_rows = 4
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
def test_upgrade(started_cluster):
node = started_cluster.instances["old_instance"]
table_name = f"test_upgrade"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
create_mv(node, table_name, dst_table_name)
def get_count():
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
expected_rows = 10
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
node.restart_with_latest_version()
assert expected_rows == get_count()
def test_exception_during_insert(started_cluster):
node = started_cluster.instances["instance_too_many_parts"]
table_name = f"test_exception_during_insert"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
create_mv(node, table_name, dst_table_name)
node.wait_for_log_line(
"Failed to process data: Code: 252. DB::Exception: Too many parts"
)
time.sleep(2)
exception = node.query(
f"SELECT exception FROM system.s3queue WHERE zookeeper_path ilike '%{table_name}%' and notEmpty(exception)"
)
assert "Too many parts" in exception
node.replace_in_config(
"/etc/clickhouse-server/config.d/merge_tree.xml",
"parts_to_throw_insert>0",
"parts_to_throw_insert>10",
)
node.restart_clickhouse()
def get_count():
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
expected_rows = 10
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
def test_commit_on_limit(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"test_commit_on_limit"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 1,
"s3queue_loading_retries": 0,
"s3queue_max_processed_files_before_commit": 10,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
incorrect_values = [
["failed", 1, 1],
]
incorrect_values_csv = (
"\n".join((",".join(map(str, row)) for row in incorrect_values)) + "\n"
).encode()
correct_values = [
[1, 1, 1],
]
correct_values_csv = (
"\n".join((",".join(map(str, row)) for row in correct_values)) + "\n"
).encode()
put_s3_file_content(
started_cluster, f"{files_path}/test_99.csv", correct_values_csv
)
put_s3_file_content(
started_cluster, f"{files_path}/test_999.csv", correct_values_csv
)
put_s3_file_content(
started_cluster, f"{files_path}/test_9999.csv", incorrect_values_csv
)
put_s3_file_content(
started_cluster, f"{files_path}/test_99999.csv", correct_values_csv
)
put_s3_file_content(
started_cluster, f"{files_path}/test_999999.csv", correct_values_csv
)
create_mv(node, table_name, dst_table_name)
def get_processed_files():
return (
node.query(
f"SELECT file_name FROM system.s3queue WHERE zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 "
)
.strip()
.split("\n")
)
def get_failed_files():
return (
node.query(
f"SELECT file_name FROM system.s3queue WHERE zookeeper_path ilike '%{table_name}%' and status = 'Failed'"
)
.strip()
.split("\n")
)
for _ in range(30):
if "test_999999.csv" in get_processed_files():
break
time.sleep(1)
assert "test_999999.csv" in get_processed_files()
assert 1 == int(
node.query(
"SELECT value FROM system.events WHERE name = 'ObjectStorageQueueFailedFiles' SETTINGS system_events_show_zero_values=1"
)
)
expected_processed = ["test_" + str(i) + ".csv" for i in range(files_to_generate)]
processed = get_processed_files()
for value in expected_processed:
assert value in processed
expected_failed = ["test_9999.csv"]
failed = get_failed_files()
for value in expected_failed:
assert value not in processed
assert value in failed