ClickHouse/tests/integration/test_storage_s3_queue/test.py

1754 lines
52 KiB
Python

import io
import logging
import random
import time
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import json
AVAILABLE_MODES = ["unordered", "ordered"]
DEFAULT_AUTH = ["'minio'", "'minio123'"]
NO_AUTH = ["NOSIGN"]
def prepare_public_s3_bucket(started_cluster):
def create_bucket(client, bucket_name, policy):
if client.bucket_exists(bucket_name):
client.remove_bucket(bucket_name)
client.make_bucket(bucket_name)
client.set_bucket_policy(bucket_name, json.dumps(policy))
def get_policy_with_public_access(bucket_name):
return {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": "*",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket",
],
"Resource": f"arn:aws:s3:::{bucket_name}",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": "*",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
],
"Resource": f"arn:aws:s3:::{bucket_name}/*",
},
],
}
minio_client = started_cluster.minio_client
started_cluster.minio_public_bucket = f"{started_cluster.minio_bucket}-public"
create_bucket(
minio_client,
started_cluster.minio_public_bucket,
get_policy_with_public_access(started_cluster.minio_public_bucket),
)
@pytest.fixture(autouse=True)
def s3_queue_setup_teardown(started_cluster):
instance = started_cluster.instances["instance"]
instance_2 = started_cluster.instances["instance2"]
instance.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
instance_2.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
minio = started_cluster.minio_client
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
for obj in objects:
minio.remove_object(started_cluster.minio_bucket, obj.object_name)
yield # run test
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"instance",
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
main_configs=[
"configs/zookeeper.xml",
"configs/s3queue_log.xml",
],
stay_alive=True,
)
cluster.add_instance(
"instance2",
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
main_configs=[
"configs/s3queue_log.xml",
],
stay_alive=True,
)
cluster.add_instance(
"old_instance",
with_zookeeper=True,
image="clickhouse/clickhouse-server",
tag="23.12",
stay_alive=True,
with_installed_binary=True,
use_old_analyzer=True,
)
cluster.add_instance(
"instance_too_many_parts",
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
main_configs=[
"configs/s3queue_log.xml",
"configs/merge_tree.xml",
],
stay_alive=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def run_query(instance, query, stdin=None, settings=None):
# type: (ClickHouseInstance, str, object, dict) -> str
logging.info("Running query '{}'...".format(query))
result = instance.query(query, stdin=stdin, settings=settings)
logging.info("Query finished")
return result
def generate_random_files(
started_cluster,
files_path,
count,
column_num=3,
row_num=10,
start_ind=0,
bucket=None,
):
files = [
(f"{files_path}/test_{i}.csv", i) for i in range(start_ind, start_ind + count)
]
files.sort(key=lambda x: x[0])
print(f"Generating files: {files}")
total_values = []
for filename, i in files:
rand_values = [
[random.randint(0, 1000) for _ in range(column_num)] for _ in range(row_num)
]
total_values += rand_values
values_csv = (
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
).encode()
put_s3_file_content(started_cluster, filename, values_csv, bucket)
return total_values
def put_s3_file_content(started_cluster, filename, data, bucket=None):
bucket = started_cluster.minio_bucket if bucket is None else bucket
buf = io.BytesIO(data)
started_cluster.minio_client.put_object(bucket, filename, buf, len(data))
def create_table(
started_cluster,
node,
table_name,
mode,
files_path,
format="column1 UInt32, column2 UInt32, column3 UInt32",
additional_settings={},
file_format="CSV",
auth=DEFAULT_AUTH,
bucket=None,
expect_error=False,
):
auth_params = ",".join(auth)
bucket = started_cluster.minio_bucket if bucket is None else bucket
settings = {
"s3queue_loading_retries": 0,
"after_processing": "keep",
"keeper_path": f"/clickhouse/test_{table_name}",
"mode": f"{mode}",
}
settings.update(additional_settings)
url = f"http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/"
node.query(f"DROP TABLE IF EXISTS {table_name}")
create_query = f"""
CREATE TABLE {table_name} ({format})
ENGINE = S3Queue('{url}', {auth_params}, {file_format})
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
"""
if expect_error:
return node.query_and_get_error(create_query)
node.query(create_query)
def create_mv(
node,
src_table_name,
dst_table_name,
format="column1 UInt32, column2 UInt32, column3 UInt32",
):
mv_name = f"{dst_table_name}_mv"
node.query(
f"""
DROP TABLE IF EXISTS {dst_table_name};
DROP TABLE IF EXISTS {mv_name};
CREATE TABLE {dst_table_name} ({format}, _path String)
ENGINE = MergeTree()
ORDER BY column1;
CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT *, _path FROM {src_table_name};
"""
)
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_delete_after_processing(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"test.delete_after_processing_{mode}"
dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data"
files_num = 5
row_num = 10
total_values = generate_random_files(
started_cluster, files_path, files_num, row_num=row_num
)
create_table(
started_cluster,
node,
table_name,
mode,
files_path,
additional_settings={"after_processing": "delete"},
)
create_mv(node, table_name, dst_table_name)
expected_count = files_num * row_num
for _ in range(100):
count = int(node.query(f"SELECT count() FROM {dst_table_name}"))
print(f"{count}/{expected_count}")
if count == expected_count:
break
time.sleep(1)
assert int(node.query(f"SELECT count() FROM {dst_table_name}")) == expected_count
assert int(node.query(f"SELECT uniq(_path) FROM {dst_table_name}")) == files_num
assert [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name} ORDER BY column1, column2, column3"
).splitlines()
] == sorted(total_values, key=lambda x: (x[0], x[1], x[2]))
minio = started_cluster.minio_client
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
assert len(objects) == 0
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_failed_retry(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"test.failed_retry_{mode}"
dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data"
file_path = f"{files_path}/trash_test.csv"
keeper_path = f"/clickhouse/test_{table_name}"
retries_num = 3
values = [
["failed", 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
put_s3_file_content(started_cluster, file_path, values_csv)
create_table(
started_cluster,
node,
table_name,
mode,
files_path,
additional_settings={
"s3queue_loading_retries": retries_num,
"keeper_path": keeper_path,
},
)
create_mv(node, table_name, dst_table_name)
failed_node_path = ""
for _ in range(20):
zk = started_cluster.get_kazoo_client("zoo1")
failed_nodes = zk.get_children(f"{keeper_path}/failed/")
if len(failed_nodes) > 0:
assert len(failed_nodes) == 1
failed_node_path = f"{keeper_path}/failed/{failed_nodes[0]}"
time.sleep(1)
assert failed_node_path != ""
retries = 0
for _ in range(20):
data, stat = zk.get(failed_node_path)
json_data = json.loads(data)
print(f"Failed node metadata: {json_data}")
assert json_data["file_path"] == file_path
retries = int(json_data["retries"])
if retries == retries_num:
break
time.sleep(1)
assert retries == retries_num
assert 0 == int(node.query(f"SELECT count() FROM {dst_table_name}"))
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_direct_select_file(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"test.direct_select_file_{mode}"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
file_path = f"{files_path}/test.csv"
values = [
[12549, 2463, 19893],
[64021, 38652, 66703],
[81611, 39650, 83516],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
put_s3_file_content(started_cluster, file_path, values_csv)
for i in range(3):
create_table(
started_cluster,
node,
f"{table_name}_{i + 1}",
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 1,
},
)
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_1").splitlines()
] == values
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_2").splitlines()
] == []
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_3").splitlines()
] == []
# New table with same zookeeper path
create_table(
started_cluster,
node,
f"{table_name}_4",
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 1,
},
)
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_4").splitlines()
] == []
# New table with different zookeeper path
keeper_path = f"/clickhouse/test_{table_name}_{mode}_2"
create_table(
started_cluster,
node,
f"{table_name}_4",
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 1,
},
)
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_4").splitlines()
] == values
values = [
[1, 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
file_path = f"{files_path}/t.csv"
put_s3_file_content(started_cluster, file_path, values_csv)
if mode == "unordered":
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_4").splitlines()
] == values
elif mode == "ordered":
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}_4").splitlines()
] == []
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_direct_select_multiple_files(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"direct_select_multiple_files_{mode}"
files_path = f"{table_name}_data"
create_table(started_cluster, node, table_name, mode, files_path)
for i in range(5):
rand_values = [[random.randint(0, 50) for _ in range(3)] for _ in range(10)]
values_csv = (
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
).encode()
file_path = f"{files_path}/test_{i}.csv"
put_s3_file_content(started_cluster, file_path, values_csv)
assert [
list(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}").splitlines()
] == rand_values
total_values = generate_random_files(started_cluster, files_path, 4, start_ind=5)
assert {
tuple(map(int, l.split()))
for l in node.query(f"SELECT * FROM {table_name}").splitlines()
} == set([tuple(i) for i in total_values])
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_streaming_to_view_(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"streaming_to_view_{mode}"
dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data"
total_values = generate_random_files(started_cluster, files_path, 10)
create_table(started_cluster, node, table_name, mode, files_path)
create_mv(node, table_name, dst_table_name)
expected_values = set([tuple(i) for i in total_values])
for i in range(10):
selected_values = {
tuple(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}"
).splitlines()
}
if selected_values == expected_values:
break
time.sleep(1)
assert selected_values == expected_values
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_streaming_to_many_views(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"streaming_to_many_views_{mode}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
for i in range(3):
table = f"{table_name}_{i + 1}"
create_table(
started_cluster,
node,
table,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
create_mv(node, table, dst_table_name)
total_values = generate_random_files(started_cluster, files_path, 5)
expected_values = set([tuple(i) for i in total_values])
def select():
return {
tuple(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}"
).splitlines()
}
for _ in range(20):
if select() == expected_values:
break
time.sleep(1)
assert select() == expected_values
def test_multiple_tables_meta_mismatch(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"multiple_tables_meta_mismatch"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
create_table(
started_cluster,
node,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
# check mode
failed = False
try:
create_table(
started_cluster,
node,
f"{table_name}_copy",
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
except QueryRuntimeException as e:
assert "Existing table metadata in ZooKeeper differs in engine mode" in str(e)
failed = True
assert failed is True
# check columns
try:
create_table(
started_cluster,
node,
f"{table_name}_copy",
"ordered",
files_path,
format="column1 UInt32, column2 UInt32, column3 UInt32, column4 UInt32",
additional_settings={
"keeper_path": keeper_path,
},
)
except QueryRuntimeException as e:
assert (
"Table columns structure in ZooKeeper is different from local table structure"
in str(e)
)
failed = True
assert failed is True
# check format
try:
create_table(
started_cluster,
node,
f"{table_name}_copy",
"ordered",
files_path,
format="column1 UInt32, column2 UInt32, column3 UInt32, column4 UInt32",
additional_settings={
"keeper_path": keeper_path,
},
file_format="TSV",
)
except QueryRuntimeException as e:
assert "Existing table metadata in ZooKeeper differs in format name" in str(e)
failed = True
assert failed is True
# create working engine
create_table(
started_cluster,
node,
f"{table_name}_copy",
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
# TODO: Update the modes for this test to include "ordered" once PR #55795 is finished.
@pytest.mark.parametrize("mode", ["unordered"])
def test_multiple_tables_streaming_sync(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"multiple_tables_streaming_sync_{mode}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
for i in range(3):
table = f"{table_name}_{i + 1}"
dst_table = f"{dst_table_name}_{i + 1}"
create_table(
started_cluster,
node,
table,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
create_mv(node, table, dst_table)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=1
)
def get_count(table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(100):
if (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
) == files_to_generate:
break
time.sleep(1)
if (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
) != files_to_generate:
info = node.query(
f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical"
)
logging.debug(info)
assert False
res1 = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_1"
).splitlines()
]
res2 = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_2"
).splitlines()
]
res3 = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_3"
).splitlines()
]
assert {tuple(v) for v in res1 + res2 + res3} == set(
[tuple(i) for i in total_values]
)
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
) == files_to_generate
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
node = started_cluster.instances["instance"]
node_2 = started_cluster.instances["instance2"]
table_name = f"multiple_tables_streaming_sync_distributed_{mode}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
row_num = 50
total_rows = row_num * files_to_generate
for instance in [node, node_2]:
create_table(
started_cluster,
instance,
table_name,
mode,
files_path,
additional_settings={"keeper_path": keeper_path, "s3queue_buckets": 2},
)
for instance in [node, node_2]:
create_mv(instance, table_name, dst_table_name)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=row_num
)
def get_count(node, table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(150):
if (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == total_rows:
break
time.sleep(1)
if (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) != total_rows:
info = node.query(
f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical"
)
logging.debug(info)
assert False
get_query = f"SELECT column1, column2, column3 FROM {dst_table_name}"
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
res2 = [
list(map(int, l.split())) for l in run_query(node_2, get_query).splitlines()
]
assert len(res1) + len(res2) == total_rows
# Checking that all engines have made progress
assert len(res1) > 0
assert len(res2) > 0
assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values])
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == total_rows
def test_max_set_age(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"max_set_age"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
max_age = 10
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_tracked_file_ttl_sec": max_age,
"s3queue_cleanup_interval_min_ms": max_age / 3,
"s3queue_cleanup_interval_max_ms": max_age / 3,
"s3queue_loading_retries": 0,
"s3queue_processing_threads_num": 1,
"s3queue_loading_retries": 0,
},
)
create_mv(node, table_name, dst_table_name)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=1
)
expected_rows = 10
node.wait_for_log_line("Checking node limits")
node.wait_for_log_line("Node limits check finished")
def get_count():
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
assert 10 == int(node.query(f"SELECT uniq(_path) from {dst_table_name}"))
time.sleep(max_age + 5)
expected_rows = 20
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
assert 10 == int(node.query(f"SELECT uniq(_path) from {dst_table_name}"))
paths_count = [
int(x)
for x in node.query(
f"SELECT count() from {dst_table_name} GROUP BY _path"
).splitlines()
]
assert 10 == len(paths_count)
for path_count in paths_count:
assert 2 == path_count
failed_count = int(
node.query(
"SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles' SETTINGS system_events_show_zero_values=1"
)
)
values = [
["failed", 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
put_s3_file_content(started_cluster, f"{files_path}/fff.csv", values_csv)
for _ in range(30):
if failed_count + 1 == int(
node.query(
"SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles' SETTINGS system_events_show_zero_values=1"
)
):
break
time.sleep(1)
assert failed_count + 1 == int(
node.query(
"SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles' SETTINGS system_events_show_zero_values=1"
)
)
node.query("SYSTEM FLUSH LOGS")
assert "Cannot parse input" in node.query(
"SELECT exception FROM system.s3queue WHERE file_name ilike '%fff.csv'"
)
assert 1 == int(
node.query(
"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%fff.csv'"
)
)
assert 1 == int(
node.query(
"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%fff.csv' AND notEmpty(exception)"
)
)
time.sleep(max_age + 1)
assert failed_count + 2 == int(
node.query("SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles'")
)
node.query("SYSTEM FLUSH LOGS")
assert "Cannot parse input" in node.query(
"SELECT exception FROM system.s3queue WHERE file_name ilike '%fff.csv' ORDER BY processing_end_time DESC LIMIT 1"
)
assert 1 < int(
node.query(
"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%fff.csv' AND notEmpty(exception)"
)
)
def test_max_set_size(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"max_set_size"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
max_age = 10
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_tracked_files_limit": 9,
"s3queue_cleanup_interval_min_ms": 0,
"s3queue_cleanup_interval_max_ms": 0,
"s3queue_processing_threads_num": 1,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
get_query = f"SELECT * FROM {table_name} ORDER BY column1, column2, column3"
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
assert res1 == sorted(total_values, key=lambda x: (x[0], x[1], x[2]))
print(total_values)
time.sleep(10)
zk = started_cluster.get_kazoo_client("zoo1")
processed_nodes = zk.get_children(f"{keeper_path}/processed/")
assert len(processed_nodes) == 9
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
assert res1 == [total_values[0]]
time.sleep(10)
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
assert res1 == [total_values[1]]
def test_drop_table(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"test_drop"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
create_table(
started_cluster,
node,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 5,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=100000
)
create_mv(node, table_name, dst_table_name)
node.wait_for_log_line(f"Reading from file: test_drop_data")
node.query(f"DROP TABLE {table_name} SYNC")
assert node.contains_in_log(
f"StorageS3Queue (default.{table_name}): Table is being dropped"
) or node.contains_in_log(
f"StorageS3Queue (default.{table_name}): Shutdown was called, stopping sync"
)
def test_s3_client_reused(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"test.test_s3_client_reused"
dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data"
row_num = 10
def get_created_s3_clients_count():
value = node.query(
f"SELECT value FROM system.events WHERE event='S3Clients'"
).strip()
return int(value) if value != "" else 0
def wait_all_processed(files_num):
expected_count = files_num * row_num
for _ in range(100):
count = int(node.query(f"SELECT count() FROM {dst_table_name}"))
print(f"{count}/{expected_count}")
if count == expected_count:
break
time.sleep(1)
assert (
int(node.query(f"SELECT count() FROM {dst_table_name}")) == expected_count
)
prepare_public_s3_bucket(started_cluster)
s3_clients_before = get_created_s3_clients_count()
create_table(
started_cluster,
node,
table_name,
"ordered",
files_path,
additional_settings={
"after_processing": "delete",
"s3queue_processing_threads_num": 1,
},
auth=NO_AUTH,
bucket=started_cluster.minio_public_bucket,
)
s3_clients_after = get_created_s3_clients_count()
assert s3_clients_before + 1 == s3_clients_after
create_mv(node, table_name, dst_table_name)
for i in range(0, 10):
s3_clients_before = get_created_s3_clients_count()
generate_random_files(
started_cluster,
files_path,
count=1,
start_ind=i,
row_num=row_num,
bucket=started_cluster.minio_public_bucket,
)
wait_all_processed(i + 1)
s3_clients_after = get_created_s3_clients_count()
assert s3_clients_before == s3_clients_after
def get_processed_files(node, table_name):
return (
node.query(
f"""
select splitByChar('/', file_name)[-1] as file
from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed' order by file
"""
)
.strip()
.split("\n")
)
def get_unprocessed_files(node, table_name):
return node.query(
f"""
select concat('test_', toString(number), '.csv') as file from numbers(300)
where file not
in (select splitByChar('/', file_name)[-1] from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed')
"""
)
@pytest.mark.parametrize("mode", ["unordered", "ordered"])
def test_processing_threads(started_cluster, mode):
node = started_cluster.instances["instance"]
table_name = f"processing_threads_{mode}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
processing_threads = 32
create_table(
started_cluster,
node,
table_name,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
},
)
create_mv(node, table_name, dst_table_name)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=1
)
def get_count(table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(50):
if (get_count(f"{dst_table_name}")) == files_to_generate:
break
time.sleep(1)
if get_count(dst_table_name) != files_to_generate:
processed_files = get_processed_files(node, table_name)
unprocessed_files = get_unprocessed_files(node, table_name)
logging.debug(
f"Processed files: {len(processed_files)}/{files_to_generate}, unprocessed files: {unprocessed_files}, count: {get_count(dst_table_name)}"
)
assert False
res = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}"
).splitlines()
]
assert {tuple(v) for v in res} == set([tuple(i) for i in total_values])
if mode == "ordered":
zk = started_cluster.get_kazoo_client("zoo1")
nodes = zk.get_children(f"{keeper_path}")
print(f"Metadata nodes: {nodes}")
processed_nodes = zk.get_children(f"{keeper_path}/buckets/")
assert len(processed_nodes) == processing_threads
@pytest.mark.parametrize(
"mode, processing_threads",
[
pytest.param("unordered", 1),
pytest.param("unordered", 8),
pytest.param("ordered", 1),
pytest.param("ordered", 8),
],
)
def test_shards(started_cluster, mode, processing_threads):
node = started_cluster.instances["instance"]
table_name = f"test_shards_{mode}_{processing_threads}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
shards_num = 3
for i in range(shards_num):
table = f"{table_name}_{i + 1}"
dst_table = f"{dst_table_name}_{i + 1}"
create_table(
started_cluster,
node,
table,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
"s3queue_buckets": shards_num,
},
)
create_mv(node, table, dst_table)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=1
)
def get_count(table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(30):
count = (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
)
if count == files_to_generate:
break
print(f"Current {count}/{files_to_generate}")
time.sleep(1)
if (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
) != files_to_generate:
processed_files = (
node.query(
f"""
select splitByChar('/', file_name)[-1] as file from system.s3queue
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 order by file
"""
)
.strip()
.split("\n")
)
logging.debug(
f"Processed files: {len(processed_files)}/{files_to_generate}: {processed_files}"
)
count = (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
)
logging.debug(f"Processed rows: {count}/{files_to_generate}")
info = node.query(
f"""
select concat('test_', toString(number), '.csv') as file from numbers(300)
where file not in (select splitByChar('/', file_name)[-1] from system.s3queue
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0)
"""
)
logging.debug(f"Unprocessed files: {info}")
assert False
res1 = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_1"
).splitlines()
]
res2 = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_2"
).splitlines()
]
res3 = [
list(map(int, l.split()))
for l in node.query(
f"SELECT column1, column2, column3 FROM {dst_table_name}_3"
).splitlines()
]
assert {tuple(v) for v in res1 + res2 + res3} == set(
[tuple(i) for i in total_values]
)
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count(f"{dst_table_name}_1")
+ get_count(f"{dst_table_name}_2")
+ get_count(f"{dst_table_name}_3")
) == files_to_generate
if mode == "ordered":
zk = started_cluster.get_kazoo_client("zoo1")
processed_nodes = zk.get_children(f"{keeper_path}/buckets/")
assert len(processed_nodes) == shards_num
@pytest.mark.parametrize(
"mode, processing_threads",
[
pytest.param("unordered", 1),
pytest.param("unordered", 8),
pytest.param("ordered", 1),
pytest.param("ordered", 8),
],
)
def test_shards_distributed(started_cluster, mode, processing_threads):
node = started_cluster.instances["instance"]
node_2 = started_cluster.instances["instance2"]
table_name = f"test_shards_distributed_{mode}_{processing_threads}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 300
row_num = 50
total_rows = row_num * files_to_generate
shards_num = 2
i = 0
for instance in [node, node_2]:
create_table(
started_cluster,
instance,
table_name,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
"s3queue_buckets": shards_num,
},
)
i += 1
for instance in [node, node_2]:
create_mv(instance, table_name, dst_table_name)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, row_num=row_num
)
def get_count(node, table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(30):
if (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == total_rows:
break
time.sleep(1)
if (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) != total_rows:
processed_files = (
node.query(
f"""
select splitByChar('/', file_name)[-1] as file from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 order by file
"""
)
.strip()
.split("\n")
)
logging.debug(
f"Processed files by node 1: {len(processed_files)}/{files_to_generate}"
)
processed_files = (
node_2.query(
f"""
select splitByChar('/', file_name)[-1] as file from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 order by file
"""
)
.strip()
.split("\n")
)
logging.debug(
f"Processed files by node 2: {len(processed_files)}/{files_to_generate}"
)
count = get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
logging.debug(f"Processed rows: {count}/{files_to_generate}")
info = node.query(
f"""
select concat('test_', toString(number), '.csv') as file from numbers(300)
where file not in (select splitByChar('/', file_name)[-1] from clusterAllReplicas(default, system.s3queue)
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0)
"""
)
logging.debug(f"Unprocessed files: {info}")
files1 = (
node.query(
f"""
select splitByChar('/', file_name)[-1] from system.s3queue
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0
"""
)
.strip()
.split("\n")
)
files2 = (
node_2.query(
f"""
select splitByChar('/', file_name)[-1] from system.s3queue
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0
"""
)
.strip()
.split("\n")
)
def intersection(list_a, list_b):
return [e for e in list_a if e in list_b]
logging.debug(f"Intersecting files: {intersection(files1, files2)}")
assert False
get_query = f"SELECT column1, column2, column3 FROM {dst_table_name}"
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
res2 = [
list(map(int, l.split())) for l in run_query(node_2, get_query).splitlines()
]
assert len(res1) + len(res2) == total_rows
# Checking that all engines have made progress
assert len(res1) > 0
assert len(res2) > 0
assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values])
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == total_rows
if mode == "ordered":
zk = started_cluster.get_kazoo_client("zoo1")
processed_nodes = zk.get_children(f"{keeper_path}/buckets/")
assert len(processed_nodes) == shards_num
node.restart_clickhouse()
time.sleep(10)
assert (
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
) == total_rows
def test_settings_check(started_cluster):
node = started_cluster.instances["instance"]
node_2 = started_cluster.instances["instance2"]
table_name = f"test_settings_check"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
mode = "ordered"
create_table(
started_cluster,
node,
table_name,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 5,
"s3queue_buckets": 2,
},
)
assert (
"Existing table metadata in ZooKeeper differs in s3queue_buckets setting. Stored in ZooKeeper: 2, local: 3"
in create_table(
started_cluster,
node_2,
table_name,
mode,
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 5,
"s3queue_buckets": 3,
},
expect_error=True,
)
)
node.query(f"DROP TABLE {table_name} SYNC")
@pytest.mark.parametrize("processing_threads", [1, 5])
def test_processed_file_setting(started_cluster, processing_threads):
node = started_cluster.instances["instance"]
table_name = f"test_processed_file_setting_{processing_threads}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}_{processing_threads}"
files_path = f"{table_name}_data"
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
"s3queue_last_processed_path": f"{files_path}/test_5.csv",
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
create_mv(node, table_name, dst_table_name)
def get_count():
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
expected_rows = 4
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
node.restart_clickhouse()
time.sleep(10)
expected_rows = 4
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
@pytest.mark.parametrize("processing_threads", [1, 5])
def test_processed_file_setting_distributed(started_cluster, processing_threads):
node = started_cluster.instances["instance"]
node_2 = started_cluster.instances["instance2"]
table_name = f"test_processed_file_setting_distributed_{processing_threads}"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 10
for instance in [node, node_2]:
create_table(
started_cluster,
instance,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": processing_threads,
"s3queue_last_processed_path": f"{files_path}/test_5.csv",
"s3queue_buckets": 2,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
for instance in [node, node_2]:
create_mv(instance, table_name, dst_table_name)
def get_count():
query = f"SELECT count() FROM {dst_table_name}"
return int(node.query(query)) + int(node_2.query(query))
expected_rows = 4
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
for instance in [node, node_2]:
instance.restart_clickhouse()
time.sleep(10)
expected_rows = 4
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
def test_upgrade(started_cluster):
node = started_cluster.instances["old_instance"]
table_name = f"test_upgrade"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
create_mv(node, table_name, dst_table_name)
def get_count():
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
expected_rows = 10
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
node.restart_with_latest_version()
assert expected_rows == get_count()
def test_exception_during_insert(started_cluster):
node = started_cluster.instances["instance_too_many_parts"]
table_name = f"test_exception_during_insert"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"unordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
create_mv(node, table_name, dst_table_name)
node.wait_for_log_line(
"Failed to process data: Code: 252. DB::Exception: Too many parts"
)
time.sleep(2)
exception = node.query(
f"SELECT exception FROM system.s3queue WHERE zookeeper_path ilike '%{table_name}%' and notEmpty(exception)"
)
assert "Too many parts" in exception
node.replace_in_config(
"/etc/clickhouse-server/config.d/merge_tree.xml",
"parts_to_throw_insert>0",
"parts_to_throw_insert>10",
)
node.restart_clickhouse()
def get_count():
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
expected_rows = 10
for _ in range(20):
if expected_rows == get_count():
break
time.sleep(1)
assert expected_rows == get_count()
def test_commit_on_limit(started_cluster):
node = started_cluster.instances["instance"]
table_name = f"test_commit_on_limit"
dst_table_name = f"{table_name}_dst"
keeper_path = f"/clickhouse/test_{table_name}"
files_path = f"{table_name}_data"
files_to_generate = 10
create_table(
started_cluster,
node,
table_name,
"ordered",
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_processing_threads_num": 1,
"s3queue_loading_retries": 0,
"s3queue_max_processed_files_before_commit": 10,
},
)
total_values = generate_random_files(
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
)
incorrect_values = [
["failed", 1, 1],
]
incorrect_values_csv = (
"\n".join((",".join(map(str, row)) for row in incorrect_values)) + "\n"
).encode()
correct_values = [
[1, 1, 1],
]
correct_values_csv = (
"\n".join((",".join(map(str, row)) for row in correct_values)) + "\n"
).encode()
put_s3_file_content(
started_cluster, f"{files_path}/test_99.csv", correct_values_csv
)
put_s3_file_content(
started_cluster, f"{files_path}/test_999.csv", correct_values_csv
)
put_s3_file_content(
started_cluster, f"{files_path}/test_9999.csv", incorrect_values_csv
)
put_s3_file_content(
started_cluster, f"{files_path}/test_99999.csv", correct_values_csv
)
put_s3_file_content(
started_cluster, f"{files_path}/test_999999.csv", correct_values_csv
)
create_mv(node, table_name, dst_table_name)
def get_processed_files():
return (
node.query(
f"SELECT file_name FROM system.s3queue WHERE zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 "
)
.strip()
.split("\n")
)
def get_failed_files():
return (
node.query(
f"SELECT file_name FROM system.s3queue WHERE zookeeper_path ilike '%{table_name}%' and status = 'Failed'"
)
.strip()
.split("\n")
)
for _ in range(30):
if "test_999999.csv" in get_processed_files():
break
time.sleep(1)
assert "test_999999.csv" in get_processed_files()
assert 1 == int(
node.query(
"SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles' SETTINGS system_events_show_zero_values=1"
)
)
expected_processed = ["test_" + str(i) + ".csv" for i in range(files_to_generate)]
processed = get_processed_files()
for value in expected_processed:
assert value in processed
expected_failed = ["test_9999.csv"]
failed = get_failed_files()
for value in expected_failed:
assert value not in processed
assert value in failed