mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-14 19:45:11 +00:00
15e0033016
Also update the doc.
1783 lines
54 KiB
Python
1783 lines
54 KiB
Python
import io
|
|
import logging
|
|
import random
|
|
import time
|
|
|
|
import pytest
|
|
from helpers.client import QueryRuntimeException
|
|
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
|
|
import json
|
|
from uuid import uuid4
|
|
|
|
|
|
AVAILABLE_MODES = ["unordered", "ordered"]
|
|
DEFAULT_AUTH = ["'minio'", "'minio123'"]
|
|
NO_AUTH = ["NOSIGN"]
|
|
AZURE_CONTAINER_NAME = "cont"
|
|
|
|
|
|
def prepare_public_s3_bucket(started_cluster):
|
|
def create_bucket(client, bucket_name, policy):
|
|
if client.bucket_exists(bucket_name):
|
|
client.remove_bucket(bucket_name)
|
|
|
|
client.make_bucket(bucket_name)
|
|
|
|
client.set_bucket_policy(bucket_name, json.dumps(policy))
|
|
|
|
def get_policy_with_public_access(bucket_name):
|
|
return {
|
|
"Version": "2012-10-17",
|
|
"Statement": [
|
|
{
|
|
"Sid": "",
|
|
"Effect": "Allow",
|
|
"Principal": "*",
|
|
"Action": [
|
|
"s3:GetBucketLocation",
|
|
"s3:ListBucket",
|
|
],
|
|
"Resource": f"arn:aws:s3:::{bucket_name}",
|
|
},
|
|
{
|
|
"Sid": "",
|
|
"Effect": "Allow",
|
|
"Principal": "*",
|
|
"Action": [
|
|
"s3:GetObject",
|
|
"s3:PutObject",
|
|
"s3:DeleteObject",
|
|
],
|
|
"Resource": f"arn:aws:s3:::{bucket_name}/*",
|
|
},
|
|
],
|
|
}
|
|
|
|
minio_client = started_cluster.minio_client
|
|
|
|
started_cluster.minio_public_bucket = f"{started_cluster.minio_bucket}-public"
|
|
create_bucket(
|
|
minio_client,
|
|
started_cluster.minio_public_bucket,
|
|
get_policy_with_public_access(started_cluster.minio_public_bucket),
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def s3_queue_setup_teardown(started_cluster):
|
|
instance = started_cluster.instances["instance"]
|
|
instance_2 = started_cluster.instances["instance2"]
|
|
|
|
instance.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
|
|
instance_2.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
|
|
|
|
minio = started_cluster.minio_client
|
|
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
|
|
for obj in objects:
|
|
minio.remove_object(started_cluster.minio_bucket, obj.object_name)
|
|
yield # run test
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster = ClickHouseCluster(__file__)
|
|
cluster.add_instance(
|
|
"instance",
|
|
user_configs=["configs/users.xml"],
|
|
with_minio=True,
|
|
with_azurite=True,
|
|
with_zookeeper=True,
|
|
main_configs=[
|
|
"configs/zookeeper.xml",
|
|
"configs/s3queue_log.xml",
|
|
],
|
|
stay_alive=True,
|
|
)
|
|
cluster.add_instance(
|
|
"instance2",
|
|
user_configs=["configs/users.xml"],
|
|
with_minio=True,
|
|
with_zookeeper=True,
|
|
main_configs=[
|
|
"configs/s3queue_log.xml",
|
|
],
|
|
stay_alive=True,
|
|
)
|
|
cluster.add_instance(
|
|
"old_instance",
|
|
with_zookeeper=True,
|
|
image="clickhouse/clickhouse-server",
|
|
tag="23.12",
|
|
stay_alive=True,
|
|
with_installed_binary=True,
|
|
use_old_analyzer=True,
|
|
)
|
|
cluster.add_instance(
|
|
"instance_too_many_parts",
|
|
user_configs=["configs/users.xml"],
|
|
with_minio=True,
|
|
with_zookeeper=True,
|
|
main_configs=[
|
|
"configs/s3queue_log.xml",
|
|
"configs/merge_tree.xml",
|
|
],
|
|
stay_alive=True,
|
|
)
|
|
|
|
logging.info("Starting cluster...")
|
|
cluster.start()
|
|
logging.info("Cluster started")
|
|
|
|
container_client = cluster.blob_service_client.get_container_client(
|
|
AZURE_CONTAINER_NAME
|
|
)
|
|
container_client.create_container()
|
|
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def run_query(instance, query, stdin=None, settings=None):
|
|
# type: (ClickHouseInstance, str, object, dict) -> str
|
|
|
|
logging.info("Running query '{}'...".format(query))
|
|
result = instance.query(query, stdin=stdin, settings=settings)
|
|
logging.info("Query finished")
|
|
|
|
return result
|
|
|
|
|
|
def generate_random_files(
|
|
started_cluster,
|
|
files_path,
|
|
count,
|
|
storage="s3",
|
|
column_num=3,
|
|
row_num=10,
|
|
start_ind=0,
|
|
bucket=None,
|
|
):
|
|
files = [
|
|
(f"{files_path}/test_{i}.csv", i) for i in range(start_ind, start_ind + count)
|
|
]
|
|
files.sort(key=lambda x: x[0])
|
|
|
|
print(f"Generating files: {files}")
|
|
|
|
total_values = []
|
|
for filename, i in files:
|
|
rand_values = [
|
|
[random.randint(0, 1000) for _ in range(column_num)] for _ in range(row_num)
|
|
]
|
|
total_values += rand_values
|
|
values_csv = (
|
|
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
|
|
).encode()
|
|
if storage == "s3":
|
|
put_s3_file_content(started_cluster, filename, values_csv, bucket)
|
|
else:
|
|
put_azure_file_content(started_cluster, filename, values_csv, bucket)
|
|
return total_values
|
|
|
|
|
|
def put_s3_file_content(started_cluster, filename, data, bucket=None):
|
|
bucket = started_cluster.minio_bucket if bucket is None else bucket
|
|
buf = io.BytesIO(data)
|
|
started_cluster.minio_client.put_object(bucket, filename, buf, len(data))
|
|
|
|
|
|
def put_azure_file_content(started_cluster, filename, data, bucket=None):
|
|
client = started_cluster.blob_service_client.get_blob_client(
|
|
AZURE_CONTAINER_NAME, filename
|
|
)
|
|
buf = io.BytesIO(data)
|
|
client.upload_blob(buf, "BlockBlob", len(data))
|
|
|
|
|
|
def create_table(
|
|
started_cluster,
|
|
node,
|
|
table_name,
|
|
mode,
|
|
files_path,
|
|
engine_name="S3Queue",
|
|
format="column1 UInt32, column2 UInt32, column3 UInt32",
|
|
additional_settings={},
|
|
file_format="CSV",
|
|
auth=DEFAULT_AUTH,
|
|
bucket=None,
|
|
expect_error=False,
|
|
):
|
|
auth_params = ",".join(auth)
|
|
bucket = started_cluster.minio_bucket if bucket is None else bucket
|
|
|
|
settings = {
|
|
"s3queue_loading_retries": 0,
|
|
"after_processing": "keep",
|
|
"keeper_path": f"/clickhouse/test_{table_name}",
|
|
"mode": f"{mode}",
|
|
}
|
|
settings.update(additional_settings)
|
|
|
|
engine_def = None
|
|
if engine_name == "S3Queue":
|
|
url = f"http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/"
|
|
engine_def = f"{engine_name}('{url}', {auth_params}, {file_format})"
|
|
else:
|
|
engine_def = f"{engine_name}('{started_cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', '{files_path}/', 'CSV')"
|
|
|
|
node.query(f"DROP TABLE IF EXISTS {table_name}")
|
|
create_query = f"""
|
|
CREATE TABLE {table_name} ({format})
|
|
ENGINE = {engine_def}
|
|
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
|
|
"""
|
|
|
|
if expect_error:
|
|
return node.query_and_get_error(create_query)
|
|
|
|
node.query(create_query)
|
|
|
|
|
|
def create_mv(
|
|
node,
|
|
src_table_name,
|
|
dst_table_name,
|
|
format="column1 UInt32, column2 UInt32, column3 UInt32",
|
|
):
|
|
mv_name = f"{dst_table_name}_mv"
|
|
node.query(
|
|
f"""
|
|
DROP TABLE IF EXISTS {dst_table_name};
|
|
DROP TABLE IF EXISTS {mv_name};
|
|
|
|
CREATE TABLE {dst_table_name} ({format}, _path String)
|
|
ENGINE = MergeTree()
|
|
ORDER BY column1;
|
|
|
|
CREATE MATERIALIZED VIEW {mv_name} TO {dst_table_name} AS SELECT *, _path FROM {src_table_name};
|
|
"""
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("mode", ["unordered", "ordered"])
|
|
@pytest.mark.parametrize("engine_name", ["S3Queue", "AzureQueue"])
|
|
def test_delete_after_processing(started_cluster, mode, engine_name):
|
|
node = started_cluster.instances["instance"]
|
|
table_name = f"test.delete_after_processing_{mode}_{engine_name}"
|
|
dst_table_name = f"{table_name}_dst"
|
|
files_path = f"{table_name}_data"
|
|
files_num = 5
|
|
row_num = 10
|
|
if engine_name == "S3Queue":
|
|
storage = "s3"
|
|
else:
|
|
storage = "azure"
|
|
|
|
total_values = generate_random_files(
|
|
started_cluster, files_path, files_num, row_num=row_num, storage=storage
|
|
)
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
table_name,
|
|
mode,
|
|
files_path,
|
|
additional_settings={"after_processing": "delete"},
|
|
engine_name=engine_name,
|
|
)
|
|
create_mv(node, table_name, dst_table_name)
|
|
|
|
expected_count = files_num * row_num
|
|
for _ in range(100):
|
|
count = int(node.query(f"SELECT count() FROM {dst_table_name}"))
|
|
print(f"{count}/{expected_count}")
|
|
if count == expected_count:
|
|
break
|
|
time.sleep(1)
|
|
|
|
assert int(node.query(f"SELECT count() FROM {dst_table_name}")) == expected_count
|
|
assert int(node.query(f"SELECT uniq(_path) FROM {dst_table_name}")) == files_num
|
|
assert [
|
|
list(map(int, l.split()))
|
|
for l in node.query(
|
|
f"SELECT column1, column2, column3 FROM {dst_table_name} ORDER BY column1, column2, column3"
|
|
).splitlines()
|
|
] == sorted(total_values, key=lambda x: (x[0], x[1], x[2]))
|
|
|
|
if engine_name == "S3Queue":
|
|
minio = started_cluster.minio_client
|
|
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
|
|
assert len(objects) == 0
|
|
else:
|
|
client = started_cluster.blob_service_client.get_container_client(
|
|
AZURE_CONTAINER_NAME
|
|
)
|
|
objects_iterator = client.list_blobs(files_path)
|
|
for objects in objects_iterator:
|
|
assert False
|
|
|
|
|
|
@pytest.mark.parametrize("mode", ["unordered", "ordered"])
|
|
@pytest.mark.parametrize("engine_name", ["S3Queue", "AzureQueue"])
|
|
def test_failed_retry(started_cluster, mode, engine_name):
|
|
node = started_cluster.instances["instance"]
|
|
table_name = f"test.failed_retry_{mode}_{engine_name}"
|
|
dst_table_name = f"{table_name}_dst"
|
|
files_path = f"{table_name}_data"
|
|
file_path = f"{files_path}/trash_test.csv"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
retries_num = 3
|
|
|
|
values = [
|
|
["failed", 1, 1],
|
|
]
|
|
values_csv = (
|
|
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
|
|
).encode()
|
|
if engine_name == "S3Queue":
|
|
put_s3_file_content(started_cluster, file_path, values_csv)
|
|
else:
|
|
put_azure_file_content(started_cluster, file_path, values_csv)
|
|
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
table_name,
|
|
mode,
|
|
files_path,
|
|
additional_settings={
|
|
"s3queue_loading_retries": retries_num,
|
|
"keeper_path": keeper_path,
|
|
},
|
|
engine_name=engine_name,
|
|
)
|
|
create_mv(node, table_name, dst_table_name)
|
|
|
|
failed_node_path = ""
|
|
for _ in range(20):
|
|
zk = started_cluster.get_kazoo_client("zoo1")
|
|
failed_nodes = zk.get_children(f"{keeper_path}/failed/")
|
|
if len(failed_nodes) > 0:
|
|
assert len(failed_nodes) == 1
|
|
failed_node_path = f"{keeper_path}/failed/{failed_nodes[0]}"
|
|
time.sleep(1)
|
|
|
|
assert failed_node_path != ""
|
|
|
|
retries = 0
|
|
for _ in range(20):
|
|
data, stat = zk.get(failed_node_path)
|
|
json_data = json.loads(data)
|
|
print(f"Failed node metadata: {json_data}")
|
|
assert json_data["file_path"] == file_path
|
|
retries = int(json_data["retries"])
|
|
if retries == retries_num:
|
|
break
|
|
time.sleep(1)
|
|
|
|
assert retries == retries_num
|
|
assert 0 == int(node.query(f"SELECT count() FROM {dst_table_name}"))
|
|
|
|
|
|
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
|
|
def test_direct_select_file(started_cluster, mode):
|
|
node = started_cluster.instances["instance"]
|
|
table_name = f"test.direct_select_file_{mode}"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
files_path = f"{table_name}_data"
|
|
file_path = f"{files_path}/test.csv"
|
|
|
|
values = [
|
|
[12549, 2463, 19893],
|
|
[64021, 38652, 66703],
|
|
[81611, 39650, 83516],
|
|
]
|
|
values_csv = (
|
|
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
|
|
).encode()
|
|
put_s3_file_content(started_cluster, file_path, values_csv)
|
|
|
|
for i in range(3):
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
f"{table_name}_{i + 1}",
|
|
mode,
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
"s3queue_processing_threads_num": 1,
|
|
},
|
|
)
|
|
|
|
assert [
|
|
list(map(int, l.split()))
|
|
for l in node.query(f"SELECT * FROM {table_name}_1").splitlines()
|
|
] == values
|
|
|
|
assert [
|
|
list(map(int, l.split()))
|
|
for l in node.query(f"SELECT * FROM {table_name}_2").splitlines()
|
|
] == []
|
|
|
|
assert [
|
|
list(map(int, l.split()))
|
|
for l in node.query(f"SELECT * FROM {table_name}_3").splitlines()
|
|
] == []
|
|
|
|
# New table with same zookeeper path
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
f"{table_name}_4",
|
|
mode,
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
"s3queue_processing_threads_num": 1,
|
|
},
|
|
)
|
|
|
|
assert [
|
|
list(map(int, l.split()))
|
|
for l in node.query(f"SELECT * FROM {table_name}_4").splitlines()
|
|
] == []
|
|
|
|
# New table with different zookeeper path
|
|
keeper_path = f"/clickhouse/test_{table_name}_{mode}_2"
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
f"{table_name}_4",
|
|
mode,
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
"s3queue_processing_threads_num": 1,
|
|
},
|
|
)
|
|
|
|
assert [
|
|
list(map(int, l.split()))
|
|
for l in node.query(f"SELECT * FROM {table_name}_4").splitlines()
|
|
] == values
|
|
|
|
values = [
|
|
[1, 1, 1],
|
|
]
|
|
values_csv = (
|
|
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
|
|
).encode()
|
|
file_path = f"{files_path}/t.csv"
|
|
put_s3_file_content(started_cluster, file_path, values_csv)
|
|
|
|
if mode == "unordered":
|
|
assert [
|
|
list(map(int, l.split()))
|
|
for l in node.query(f"SELECT * FROM {table_name}_4").splitlines()
|
|
] == values
|
|
elif mode == "ordered":
|
|
assert [
|
|
list(map(int, l.split()))
|
|
for l in node.query(f"SELECT * FROM {table_name}_4").splitlines()
|
|
] == []
|
|
|
|
|
|
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
|
|
def test_direct_select_multiple_files(started_cluster, mode):
|
|
node = started_cluster.instances["instance"]
|
|
table_name = f"direct_select_multiple_files_{mode}"
|
|
files_path = f"{table_name}_data"
|
|
|
|
create_table(started_cluster, node, table_name, mode, files_path)
|
|
for i in range(5):
|
|
rand_values = [[random.randint(0, 50) for _ in range(3)] for _ in range(10)]
|
|
values_csv = (
|
|
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
|
|
).encode()
|
|
|
|
file_path = f"{files_path}/test_{i}.csv"
|
|
put_s3_file_content(started_cluster, file_path, values_csv)
|
|
|
|
assert [
|
|
list(map(int, l.split()))
|
|
for l in node.query(f"SELECT * FROM {table_name}").splitlines()
|
|
] == rand_values
|
|
|
|
total_values = generate_random_files(started_cluster, files_path, 4, start_ind=5)
|
|
assert {
|
|
tuple(map(int, l.split()))
|
|
for l in node.query(f"SELECT * FROM {table_name}").splitlines()
|
|
} == set([tuple(i) for i in total_values])
|
|
|
|
|
|
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
|
|
def test_streaming_to_view_(started_cluster, mode):
|
|
node = started_cluster.instances["instance"]
|
|
table_name = f"streaming_to_view_{mode}"
|
|
dst_table_name = f"{table_name}_dst"
|
|
files_path = f"{table_name}_data"
|
|
|
|
total_values = generate_random_files(started_cluster, files_path, 10)
|
|
create_table(started_cluster, node, table_name, mode, files_path)
|
|
create_mv(node, table_name, dst_table_name)
|
|
|
|
expected_values = set([tuple(i) for i in total_values])
|
|
for i in range(10):
|
|
selected_values = {
|
|
tuple(map(int, l.split()))
|
|
for l in node.query(
|
|
f"SELECT column1, column2, column3 FROM {dst_table_name}"
|
|
).splitlines()
|
|
}
|
|
if selected_values == expected_values:
|
|
break
|
|
time.sleep(1)
|
|
assert selected_values == expected_values
|
|
|
|
|
|
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
|
|
def test_streaming_to_many_views(started_cluster, mode):
|
|
node = started_cluster.instances["instance"]
|
|
table_name = f"streaming_to_many_views_{mode}"
|
|
dst_table_name = f"{table_name}_dst"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
files_path = f"{table_name}_data"
|
|
|
|
for i in range(3):
|
|
table = f"{table_name}_{i + 1}"
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
table,
|
|
mode,
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
},
|
|
)
|
|
create_mv(node, table, dst_table_name)
|
|
|
|
total_values = generate_random_files(started_cluster, files_path, 5)
|
|
expected_values = set([tuple(i) for i in total_values])
|
|
|
|
def select():
|
|
return {
|
|
tuple(map(int, l.split()))
|
|
for l in node.query(
|
|
f"SELECT column1, column2, column3 FROM {dst_table_name}"
|
|
).splitlines()
|
|
}
|
|
|
|
for _ in range(20):
|
|
if select() == expected_values:
|
|
break
|
|
time.sleep(1)
|
|
assert select() == expected_values
|
|
|
|
|
|
def test_multiple_tables_meta_mismatch(started_cluster):
|
|
node = started_cluster.instances["instance"]
|
|
table_name = f"multiple_tables_meta_mismatch"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
files_path = f"{table_name}_data"
|
|
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
table_name,
|
|
"ordered",
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
},
|
|
)
|
|
# check mode
|
|
failed = False
|
|
try:
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
f"{table_name}_copy",
|
|
"unordered",
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
},
|
|
)
|
|
except QueryRuntimeException as e:
|
|
assert "Existing table metadata in ZooKeeper differs in engine mode" in str(e)
|
|
failed = True
|
|
|
|
assert failed is True
|
|
|
|
# check columns
|
|
try:
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
f"{table_name}_copy",
|
|
"ordered",
|
|
files_path,
|
|
format="column1 UInt32, column2 UInt32, column3 UInt32, column4 UInt32",
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
},
|
|
)
|
|
except QueryRuntimeException as e:
|
|
assert (
|
|
"Table columns structure in ZooKeeper is different from local table structure"
|
|
in str(e)
|
|
)
|
|
failed = True
|
|
|
|
assert failed is True
|
|
|
|
# check format
|
|
try:
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
f"{table_name}_copy",
|
|
"ordered",
|
|
files_path,
|
|
format="column1 UInt32, column2 UInt32, column3 UInt32, column4 UInt32",
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
},
|
|
file_format="TSV",
|
|
)
|
|
except QueryRuntimeException as e:
|
|
assert "Existing table metadata in ZooKeeper differs in format name" in str(e)
|
|
failed = True
|
|
|
|
assert failed is True
|
|
|
|
# create working engine
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
f"{table_name}_copy",
|
|
"ordered",
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
},
|
|
)
|
|
|
|
|
|
# TODO: Update the modes for this test to include "ordered" once PR #55795 is finished.
|
|
@pytest.mark.parametrize("mode", ["unordered"])
|
|
def test_multiple_tables_streaming_sync(started_cluster, mode):
|
|
node = started_cluster.instances["instance"]
|
|
table_name = f"multiple_tables_streaming_sync_{mode}"
|
|
dst_table_name = f"{table_name}_dst"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
files_path = f"{table_name}_data"
|
|
files_to_generate = 300
|
|
|
|
for i in range(3):
|
|
table = f"{table_name}_{i + 1}"
|
|
dst_table = f"{dst_table_name}_{i + 1}"
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
table,
|
|
mode,
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
},
|
|
)
|
|
create_mv(node, table, dst_table)
|
|
|
|
total_values = generate_random_files(
|
|
started_cluster, files_path, files_to_generate, row_num=1
|
|
)
|
|
|
|
def get_count(table_name):
|
|
return int(run_query(node, f"SELECT count() FROM {table_name}"))
|
|
|
|
for _ in range(100):
|
|
if (
|
|
get_count(f"{dst_table_name}_1")
|
|
+ get_count(f"{dst_table_name}_2")
|
|
+ get_count(f"{dst_table_name}_3")
|
|
) == files_to_generate:
|
|
break
|
|
time.sleep(1)
|
|
|
|
if (
|
|
get_count(f"{dst_table_name}_1")
|
|
+ get_count(f"{dst_table_name}_2")
|
|
+ get_count(f"{dst_table_name}_3")
|
|
) != files_to_generate:
|
|
info = node.query(
|
|
f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical"
|
|
)
|
|
logging.debug(info)
|
|
assert False
|
|
|
|
res1 = [
|
|
list(map(int, l.split()))
|
|
for l in node.query(
|
|
f"SELECT column1, column2, column3 FROM {dst_table_name}_1"
|
|
).splitlines()
|
|
]
|
|
res2 = [
|
|
list(map(int, l.split()))
|
|
for l in node.query(
|
|
f"SELECT column1, column2, column3 FROM {dst_table_name}_2"
|
|
).splitlines()
|
|
]
|
|
res3 = [
|
|
list(map(int, l.split()))
|
|
for l in node.query(
|
|
f"SELECT column1, column2, column3 FROM {dst_table_name}_3"
|
|
).splitlines()
|
|
]
|
|
assert {tuple(v) for v in res1 + res2 + res3} == set(
|
|
[tuple(i) for i in total_values]
|
|
)
|
|
|
|
# Checking that all files were processed only once
|
|
time.sleep(10)
|
|
assert (
|
|
get_count(f"{dst_table_name}_1")
|
|
+ get_count(f"{dst_table_name}_2")
|
|
+ get_count(f"{dst_table_name}_3")
|
|
) == files_to_generate
|
|
|
|
|
|
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
|
|
def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
|
|
node = started_cluster.instances["instance"]
|
|
node_2 = started_cluster.instances["instance2"]
|
|
table_name = f"multiple_tables_streaming_sync_distributed_{mode}"
|
|
dst_table_name = f"{table_name}_dst"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
files_path = f"{table_name}_data"
|
|
files_to_generate = 300
|
|
row_num = 50
|
|
total_rows = row_num * files_to_generate
|
|
|
|
for instance in [node, node_2]:
|
|
create_table(
|
|
started_cluster,
|
|
instance,
|
|
table_name,
|
|
mode,
|
|
files_path,
|
|
additional_settings={"keeper_path": keeper_path, "s3queue_buckets": 2},
|
|
)
|
|
|
|
for instance in [node, node_2]:
|
|
create_mv(instance, table_name, dst_table_name)
|
|
|
|
total_values = generate_random_files(
|
|
started_cluster, files_path, files_to_generate, row_num=row_num
|
|
)
|
|
|
|
def get_count(node, table_name):
|
|
return int(run_query(node, f"SELECT count() FROM {table_name}"))
|
|
|
|
for _ in range(150):
|
|
if (
|
|
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
|
|
) == total_rows:
|
|
break
|
|
time.sleep(1)
|
|
|
|
if (
|
|
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
|
|
) != total_rows:
|
|
info = node.query(
|
|
f"SELECT * FROM system.s3queue WHERE zookeeper_path like '%{table_name}' ORDER BY file_name FORMAT Vertical"
|
|
)
|
|
logging.debug(info)
|
|
assert False
|
|
|
|
get_query = f"SELECT column1, column2, column3 FROM {dst_table_name}"
|
|
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
|
|
res2 = [
|
|
list(map(int, l.split())) for l in run_query(node_2, get_query).splitlines()
|
|
]
|
|
|
|
assert len(res1) + len(res2) == total_rows
|
|
|
|
# Checking that all engines have made progress
|
|
assert len(res1) > 0
|
|
assert len(res2) > 0
|
|
|
|
assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values])
|
|
|
|
# Checking that all files were processed only once
|
|
time.sleep(10)
|
|
assert (
|
|
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
|
|
) == total_rows
|
|
|
|
|
|
def test_max_set_age(started_cluster):
|
|
node = started_cluster.instances["instance"]
|
|
table_name = "max_set_age"
|
|
dst_table_name = f"{table_name}_dst"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
files_path = f"{table_name}_data"
|
|
max_age = 20
|
|
files_to_generate = 10
|
|
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
table_name,
|
|
"unordered",
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
"tracked_file_ttl_sec": max_age,
|
|
"cleanup_interval_min_ms": max_age / 3,
|
|
"cleanup_interval_max_ms": max_age / 3,
|
|
"loading_retries": 0,
|
|
"processing_threads_num": 1,
|
|
"loading_retries": 0,
|
|
},
|
|
)
|
|
create_mv(node, table_name, dst_table_name)
|
|
|
|
_ = generate_random_files(started_cluster, files_path, files_to_generate, row_num=1)
|
|
|
|
expected_rows = files_to_generate
|
|
|
|
node.wait_for_log_line("Checking node limits")
|
|
node.wait_for_log_line("Node limits check finished")
|
|
|
|
def get_count():
|
|
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
|
|
|
|
def wait_for_condition(check_function, max_wait_time=1.5 * max_age):
|
|
before = time.time()
|
|
while time.time() - before < max_wait_time:
|
|
if check_function():
|
|
return
|
|
time.sleep(0.25)
|
|
assert False
|
|
|
|
wait_for_condition(lambda: get_count() == expected_rows)
|
|
assert files_to_generate == int(
|
|
node.query(f"SELECT uniq(_path) from {dst_table_name}")
|
|
)
|
|
|
|
expected_rows *= 2
|
|
wait_for_condition(lambda: get_count() == expected_rows)
|
|
assert files_to_generate == int(
|
|
node.query(f"SELECT uniq(_path) from {dst_table_name}")
|
|
)
|
|
|
|
paths_count = [
|
|
int(x)
|
|
for x in node.query(
|
|
f"SELECT count() from {dst_table_name} GROUP BY _path"
|
|
).splitlines()
|
|
]
|
|
assert files_to_generate == len(paths_count)
|
|
for path_count in paths_count:
|
|
assert 2 == path_count
|
|
|
|
def get_object_storage_failures():
|
|
return int(
|
|
node.query(
|
|
"SELECT value FROM system.events WHERE name = 'ObjectStorageQueueFailedFiles' SETTINGS system_events_show_zero_values=1"
|
|
)
|
|
)
|
|
|
|
failed_count = get_object_storage_failures()
|
|
|
|
values = [
|
|
["failed", 1, 1],
|
|
]
|
|
values_csv = (
|
|
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
|
|
).encode()
|
|
|
|
# use a different filename for each test to allow running a bunch of them sequentially with --count
|
|
file_with_error = f"max_set_age_fail_{uuid4().hex[:8]}.csv"
|
|
put_s3_file_content(started_cluster, f"{files_path}/{file_with_error}", values_csv)
|
|
|
|
wait_for_condition(lambda: failed_count + 1 == get_object_storage_failures())
|
|
|
|
node.query("SYSTEM FLUSH LOGS")
|
|
assert "Cannot parse input" in node.query(
|
|
f"SELECT exception FROM system.s3queue WHERE file_name ilike '%{file_with_error}'"
|
|
)
|
|
|
|
assert 1 == int(
|
|
node.query(
|
|
f"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%{file_with_error}' AND notEmpty(exception)"
|
|
)
|
|
)
|
|
|
|
wait_for_condition(lambda: failed_count + 2 == get_object_storage_failures())
|
|
|
|
node.query("SYSTEM FLUSH LOGS")
|
|
assert "Cannot parse input" in node.query(
|
|
f"SELECT exception FROM system.s3queue WHERE file_name ilike '%{file_with_error}' ORDER BY processing_end_time DESC LIMIT 1"
|
|
)
|
|
assert 1 < int(
|
|
node.query(
|
|
f"SELECT count() FROM system.s3queue_log WHERE file_name ilike '%{file_with_error}' AND notEmpty(exception)"
|
|
)
|
|
)
|
|
|
|
|
|
def test_max_set_size(started_cluster):
|
|
node = started_cluster.instances["instance"]
|
|
table_name = f"max_set_size"
|
|
dst_table_name = f"{table_name}_dst"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
files_path = f"{table_name}_data"
|
|
max_age = 10
|
|
files_to_generate = 10
|
|
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
table_name,
|
|
"unordered",
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
"s3queue_tracked_files_limit": 9,
|
|
"s3queue_cleanup_interval_min_ms": 0,
|
|
"s3queue_cleanup_interval_max_ms": 0,
|
|
"s3queue_processing_threads_num": 1,
|
|
},
|
|
)
|
|
total_values = generate_random_files(
|
|
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
|
|
)
|
|
|
|
get_query = f"SELECT * FROM {table_name} ORDER BY column1, column2, column3"
|
|
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
|
|
assert res1 == sorted(total_values, key=lambda x: (x[0], x[1], x[2]))
|
|
print(total_values)
|
|
|
|
time.sleep(10)
|
|
|
|
zk = started_cluster.get_kazoo_client("zoo1")
|
|
processed_nodes = zk.get_children(f"{keeper_path}/processed/")
|
|
assert len(processed_nodes) == 9
|
|
|
|
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
|
|
assert res1 == [total_values[0]]
|
|
|
|
time.sleep(10)
|
|
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
|
|
assert res1 == [total_values[1]]
|
|
|
|
|
|
def test_drop_table(started_cluster):
|
|
node = started_cluster.instances["instance"]
|
|
table_name = f"test_drop"
|
|
dst_table_name = f"{table_name}_dst"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
files_path = f"{table_name}_data"
|
|
files_to_generate = 300
|
|
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
table_name,
|
|
"unordered",
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
"s3queue_processing_threads_num": 5,
|
|
},
|
|
)
|
|
total_values = generate_random_files(
|
|
started_cluster, files_path, files_to_generate, start_ind=0, row_num=100000
|
|
)
|
|
create_mv(node, table_name, dst_table_name)
|
|
node.wait_for_log_line(f"Reading from file: test_drop_data")
|
|
node.query(f"DROP TABLE {table_name} SYNC")
|
|
assert node.contains_in_log(
|
|
f"StorageS3Queue (default.{table_name}): Table is being dropped"
|
|
) or node.contains_in_log(
|
|
f"StorageS3Queue (default.{table_name}): Shutdown was called, stopping sync"
|
|
)
|
|
|
|
|
|
def test_s3_client_reused(started_cluster):
|
|
node = started_cluster.instances["instance"]
|
|
table_name = f"test.test_s3_client_reused"
|
|
dst_table_name = f"{table_name}_dst"
|
|
files_path = f"{table_name}_data"
|
|
row_num = 10
|
|
|
|
def get_created_s3_clients_count():
|
|
value = node.query(
|
|
f"SELECT value FROM system.events WHERE event='S3Clients'"
|
|
).strip()
|
|
return int(value) if value != "" else 0
|
|
|
|
def wait_all_processed(files_num):
|
|
expected_count = files_num * row_num
|
|
for _ in range(100):
|
|
count = int(node.query(f"SELECT count() FROM {dst_table_name}"))
|
|
print(f"{count}/{expected_count}")
|
|
if count == expected_count:
|
|
break
|
|
time.sleep(1)
|
|
assert (
|
|
int(node.query(f"SELECT count() FROM {dst_table_name}")) == expected_count
|
|
)
|
|
|
|
prepare_public_s3_bucket(started_cluster)
|
|
|
|
s3_clients_before = get_created_s3_clients_count()
|
|
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
table_name,
|
|
"ordered",
|
|
files_path,
|
|
additional_settings={
|
|
"after_processing": "delete",
|
|
"s3queue_processing_threads_num": 1,
|
|
},
|
|
auth=NO_AUTH,
|
|
bucket=started_cluster.minio_public_bucket,
|
|
)
|
|
|
|
s3_clients_after = get_created_s3_clients_count()
|
|
assert s3_clients_before + 1 == s3_clients_after
|
|
|
|
create_mv(node, table_name, dst_table_name)
|
|
|
|
for i in range(0, 10):
|
|
s3_clients_before = get_created_s3_clients_count()
|
|
|
|
generate_random_files(
|
|
started_cluster,
|
|
files_path,
|
|
count=1,
|
|
start_ind=i,
|
|
row_num=row_num,
|
|
bucket=started_cluster.minio_public_bucket,
|
|
)
|
|
|
|
wait_all_processed(i + 1)
|
|
|
|
s3_clients_after = get_created_s3_clients_count()
|
|
|
|
assert s3_clients_before == s3_clients_after
|
|
|
|
|
|
def get_processed_files(node, table_name):
|
|
return (
|
|
node.query(
|
|
f"""
|
|
select splitByChar('/', file_name)[-1] as file
|
|
from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed' order by file
|
|
"""
|
|
)
|
|
.strip()
|
|
.split("\n")
|
|
)
|
|
|
|
|
|
def get_unprocessed_files(node, table_name):
|
|
return node.query(
|
|
f"""
|
|
select concat('test_', toString(number), '.csv') as file from numbers(300)
|
|
where file not
|
|
in (select splitByChar('/', file_name)[-1] from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed')
|
|
"""
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize("mode", ["unordered", "ordered"])
|
|
def test_processing_threads(started_cluster, mode):
|
|
node = started_cluster.instances["instance"]
|
|
table_name = f"processing_threads_{mode}"
|
|
dst_table_name = f"{table_name}_dst"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
files_path = f"{table_name}_data"
|
|
files_to_generate = 300
|
|
processing_threads = 32
|
|
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
table_name,
|
|
mode,
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
"s3queue_processing_threads_num": processing_threads,
|
|
},
|
|
)
|
|
create_mv(node, table_name, dst_table_name)
|
|
|
|
total_values = generate_random_files(
|
|
started_cluster, files_path, files_to_generate, row_num=1
|
|
)
|
|
|
|
def get_count(table_name):
|
|
return int(run_query(node, f"SELECT count() FROM {table_name}"))
|
|
|
|
for _ in range(50):
|
|
if (get_count(f"{dst_table_name}")) == files_to_generate:
|
|
break
|
|
time.sleep(1)
|
|
|
|
if get_count(dst_table_name) != files_to_generate:
|
|
processed_files = get_processed_files(node, table_name)
|
|
unprocessed_files = get_unprocessed_files(node, table_name)
|
|
logging.debug(
|
|
f"Processed files: {len(processed_files)}/{files_to_generate}, unprocessed files: {unprocessed_files}, count: {get_count(dst_table_name)}"
|
|
)
|
|
assert False
|
|
|
|
res = [
|
|
list(map(int, l.split()))
|
|
for l in node.query(
|
|
f"SELECT column1, column2, column3 FROM {dst_table_name}"
|
|
).splitlines()
|
|
]
|
|
assert {tuple(v) for v in res} == set([tuple(i) for i in total_values])
|
|
|
|
if mode == "ordered":
|
|
zk = started_cluster.get_kazoo_client("zoo1")
|
|
nodes = zk.get_children(f"{keeper_path}")
|
|
print(f"Metadata nodes: {nodes}")
|
|
processed_nodes = zk.get_children(f"{keeper_path}/buckets/")
|
|
assert len(processed_nodes) == processing_threads
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"mode, processing_threads",
|
|
[
|
|
pytest.param("unordered", 1),
|
|
pytest.param("unordered", 8),
|
|
pytest.param("ordered", 1),
|
|
pytest.param("ordered", 8),
|
|
],
|
|
)
|
|
def test_shards(started_cluster, mode, processing_threads):
|
|
node = started_cluster.instances["instance"]
|
|
table_name = f"test_shards_{mode}_{processing_threads}"
|
|
dst_table_name = f"{table_name}_dst"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
files_path = f"{table_name}_data"
|
|
files_to_generate = 300
|
|
shards_num = 3
|
|
|
|
for i in range(shards_num):
|
|
table = f"{table_name}_{i + 1}"
|
|
dst_table = f"{dst_table_name}_{i + 1}"
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
table,
|
|
mode,
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
"s3queue_processing_threads_num": processing_threads,
|
|
"s3queue_buckets": shards_num,
|
|
},
|
|
)
|
|
create_mv(node, table, dst_table)
|
|
|
|
total_values = generate_random_files(
|
|
started_cluster, files_path, files_to_generate, row_num=1
|
|
)
|
|
|
|
def get_count(table_name):
|
|
return int(run_query(node, f"SELECT count() FROM {table_name}"))
|
|
|
|
for _ in range(30):
|
|
count = (
|
|
get_count(f"{dst_table_name}_1")
|
|
+ get_count(f"{dst_table_name}_2")
|
|
+ get_count(f"{dst_table_name}_3")
|
|
)
|
|
if count == files_to_generate:
|
|
break
|
|
print(f"Current {count}/{files_to_generate}")
|
|
time.sleep(1)
|
|
|
|
if (
|
|
get_count(f"{dst_table_name}_1")
|
|
+ get_count(f"{dst_table_name}_2")
|
|
+ get_count(f"{dst_table_name}_3")
|
|
) != files_to_generate:
|
|
processed_files = (
|
|
node.query(
|
|
f"""
|
|
select splitByChar('/', file_name)[-1] as file from system.s3queue
|
|
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 order by file
|
|
"""
|
|
)
|
|
.strip()
|
|
.split("\n")
|
|
)
|
|
logging.debug(
|
|
f"Processed files: {len(processed_files)}/{files_to_generate}: {processed_files}"
|
|
)
|
|
|
|
count = (
|
|
get_count(f"{dst_table_name}_1")
|
|
+ get_count(f"{dst_table_name}_2")
|
|
+ get_count(f"{dst_table_name}_3")
|
|
)
|
|
logging.debug(f"Processed rows: {count}/{files_to_generate}")
|
|
|
|
info = node.query(
|
|
f"""
|
|
select concat('test_', toString(number), '.csv') as file from numbers(300)
|
|
where file not in (select splitByChar('/', file_name)[-1] from system.s3queue
|
|
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0)
|
|
"""
|
|
)
|
|
logging.debug(f"Unprocessed files: {info}")
|
|
|
|
assert False
|
|
|
|
res1 = [
|
|
list(map(int, l.split()))
|
|
for l in node.query(
|
|
f"SELECT column1, column2, column3 FROM {dst_table_name}_1"
|
|
).splitlines()
|
|
]
|
|
res2 = [
|
|
list(map(int, l.split()))
|
|
for l in node.query(
|
|
f"SELECT column1, column2, column3 FROM {dst_table_name}_2"
|
|
).splitlines()
|
|
]
|
|
res3 = [
|
|
list(map(int, l.split()))
|
|
for l in node.query(
|
|
f"SELECT column1, column2, column3 FROM {dst_table_name}_3"
|
|
).splitlines()
|
|
]
|
|
assert {tuple(v) for v in res1 + res2 + res3} == set(
|
|
[tuple(i) for i in total_values]
|
|
)
|
|
|
|
# Checking that all files were processed only once
|
|
time.sleep(10)
|
|
assert (
|
|
get_count(f"{dst_table_name}_1")
|
|
+ get_count(f"{dst_table_name}_2")
|
|
+ get_count(f"{dst_table_name}_3")
|
|
) == files_to_generate
|
|
|
|
if mode == "ordered":
|
|
zk = started_cluster.get_kazoo_client("zoo1")
|
|
processed_nodes = zk.get_children(f"{keeper_path}/buckets/")
|
|
assert len(processed_nodes) == shards_num
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"mode, processing_threads",
|
|
[
|
|
pytest.param("unordered", 1),
|
|
pytest.param("unordered", 8),
|
|
pytest.param("ordered", 1),
|
|
pytest.param("ordered", 8),
|
|
],
|
|
)
|
|
def test_shards_distributed(started_cluster, mode, processing_threads):
|
|
node = started_cluster.instances["instance"]
|
|
node_2 = started_cluster.instances["instance2"]
|
|
table_name = f"test_shards_distributed_{mode}_{processing_threads}"
|
|
dst_table_name = f"{table_name}_dst"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
files_path = f"{table_name}_data"
|
|
files_to_generate = 300
|
|
row_num = 50
|
|
total_rows = row_num * files_to_generate
|
|
shards_num = 2
|
|
|
|
i = 0
|
|
for instance in [node, node_2]:
|
|
create_table(
|
|
started_cluster,
|
|
instance,
|
|
table_name,
|
|
mode,
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
"s3queue_processing_threads_num": processing_threads,
|
|
"s3queue_buckets": shards_num,
|
|
},
|
|
)
|
|
i += 1
|
|
|
|
for instance in [node, node_2]:
|
|
create_mv(instance, table_name, dst_table_name)
|
|
|
|
total_values = generate_random_files(
|
|
started_cluster, files_path, files_to_generate, row_num=row_num
|
|
)
|
|
|
|
def get_count(node, table_name):
|
|
return int(run_query(node, f"SELECT count() FROM {table_name}"))
|
|
|
|
for _ in range(30):
|
|
if (
|
|
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
|
|
) == total_rows:
|
|
break
|
|
time.sleep(1)
|
|
|
|
if (
|
|
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
|
|
) != total_rows:
|
|
processed_files = (
|
|
node.query(
|
|
f"""
|
|
select splitByChar('/', file_name)[-1] as file from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 order by file
|
|
"""
|
|
)
|
|
.strip()
|
|
.split("\n")
|
|
)
|
|
logging.debug(
|
|
f"Processed files by node 1: {len(processed_files)}/{files_to_generate}"
|
|
)
|
|
processed_files = (
|
|
node_2.query(
|
|
f"""
|
|
select splitByChar('/', file_name)[-1] as file from system.s3queue where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 order by file
|
|
"""
|
|
)
|
|
.strip()
|
|
.split("\n")
|
|
)
|
|
logging.debug(
|
|
f"Processed files by node 2: {len(processed_files)}/{files_to_generate}"
|
|
)
|
|
|
|
count = get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
|
|
logging.debug(f"Processed rows: {count}/{files_to_generate}")
|
|
|
|
info = node.query(
|
|
f"""
|
|
select concat('test_', toString(number), '.csv') as file from numbers(300)
|
|
where file not in (select splitByChar('/', file_name)[-1] from clusterAllReplicas(default, system.s3queue)
|
|
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0)
|
|
"""
|
|
)
|
|
logging.debug(f"Unprocessed files: {info}")
|
|
|
|
files1 = (
|
|
node.query(
|
|
f"""
|
|
select splitByChar('/', file_name)[-1] from system.s3queue
|
|
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0
|
|
"""
|
|
)
|
|
.strip()
|
|
.split("\n")
|
|
)
|
|
files2 = (
|
|
node_2.query(
|
|
f"""
|
|
select splitByChar('/', file_name)[-1] from system.s3queue
|
|
where zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0
|
|
"""
|
|
)
|
|
.strip()
|
|
.split("\n")
|
|
)
|
|
|
|
def intersection(list_a, list_b):
|
|
return [e for e in list_a if e in list_b]
|
|
|
|
logging.debug(f"Intersecting files: {intersection(files1, files2)}")
|
|
|
|
assert False
|
|
|
|
get_query = f"SELECT column1, column2, column3 FROM {dst_table_name}"
|
|
res1 = [list(map(int, l.split())) for l in run_query(node, get_query).splitlines()]
|
|
res2 = [
|
|
list(map(int, l.split())) for l in run_query(node_2, get_query).splitlines()
|
|
]
|
|
|
|
assert len(res1) + len(res2) == total_rows
|
|
|
|
# Checking that all engines have made progress
|
|
assert len(res1) > 0
|
|
assert len(res2) > 0
|
|
|
|
assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values])
|
|
|
|
# Checking that all files were processed only once
|
|
time.sleep(10)
|
|
assert (
|
|
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
|
|
) == total_rows
|
|
|
|
if mode == "ordered":
|
|
zk = started_cluster.get_kazoo_client("zoo1")
|
|
processed_nodes = zk.get_children(f"{keeper_path}/buckets/")
|
|
assert len(processed_nodes) == shards_num
|
|
|
|
node.restart_clickhouse()
|
|
time.sleep(10)
|
|
assert (
|
|
get_count(node, dst_table_name) + get_count(node_2, dst_table_name)
|
|
) == total_rows
|
|
|
|
|
|
def test_settings_check(started_cluster):
|
|
node = started_cluster.instances["instance"]
|
|
node_2 = started_cluster.instances["instance2"]
|
|
table_name = f"test_settings_check"
|
|
dst_table_name = f"{table_name}_dst"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
files_path = f"{table_name}_data"
|
|
mode = "ordered"
|
|
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
table_name,
|
|
mode,
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
"s3queue_processing_threads_num": 5,
|
|
"s3queue_buckets": 2,
|
|
},
|
|
)
|
|
|
|
assert (
|
|
"Existing table metadata in ZooKeeper differs in buckets setting. Stored in ZooKeeper: 2, local: 3"
|
|
in create_table(
|
|
started_cluster,
|
|
node_2,
|
|
table_name,
|
|
mode,
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
"s3queue_processing_threads_num": 5,
|
|
"s3queue_buckets": 3,
|
|
},
|
|
expect_error=True,
|
|
)
|
|
)
|
|
|
|
node.query(f"DROP TABLE {table_name} SYNC")
|
|
|
|
|
|
@pytest.mark.parametrize("processing_threads", [1, 5])
|
|
def test_processed_file_setting(started_cluster, processing_threads):
|
|
node = started_cluster.instances["instance"]
|
|
table_name = f"test_processed_file_setting_{processing_threads}"
|
|
dst_table_name = f"{table_name}_dst"
|
|
keeper_path = f"/clickhouse/test_{table_name}_{processing_threads}"
|
|
files_path = f"{table_name}_data"
|
|
files_to_generate = 10
|
|
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
table_name,
|
|
"ordered",
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
"s3queue_processing_threads_num": processing_threads,
|
|
"s3queue_last_processed_path": f"{files_path}/test_5.csv",
|
|
},
|
|
)
|
|
total_values = generate_random_files(
|
|
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
|
|
)
|
|
|
|
create_mv(node, table_name, dst_table_name)
|
|
|
|
def get_count():
|
|
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
|
|
|
|
expected_rows = 4
|
|
for _ in range(20):
|
|
if expected_rows == get_count():
|
|
break
|
|
time.sleep(1)
|
|
|
|
assert expected_rows == get_count()
|
|
|
|
node.restart_clickhouse()
|
|
time.sleep(10)
|
|
|
|
expected_rows = 4
|
|
for _ in range(20):
|
|
if expected_rows == get_count():
|
|
break
|
|
time.sleep(1)
|
|
|
|
assert expected_rows == get_count()
|
|
|
|
|
|
@pytest.mark.parametrize("processing_threads", [1, 5])
|
|
def test_processed_file_setting_distributed(started_cluster, processing_threads):
|
|
node = started_cluster.instances["instance"]
|
|
node_2 = started_cluster.instances["instance2"]
|
|
table_name = f"test_processed_file_setting_distributed_{processing_threads}"
|
|
dst_table_name = f"{table_name}_dst"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
files_path = f"{table_name}_data"
|
|
files_to_generate = 10
|
|
|
|
for instance in [node, node_2]:
|
|
create_table(
|
|
started_cluster,
|
|
instance,
|
|
table_name,
|
|
"ordered",
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
"s3queue_processing_threads_num": processing_threads,
|
|
"s3queue_last_processed_path": f"{files_path}/test_5.csv",
|
|
"s3queue_buckets": 2,
|
|
},
|
|
)
|
|
|
|
total_values = generate_random_files(
|
|
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
|
|
)
|
|
|
|
for instance in [node, node_2]:
|
|
create_mv(instance, table_name, dst_table_name)
|
|
|
|
def get_count():
|
|
query = f"SELECT count() FROM {dst_table_name}"
|
|
return int(node.query(query)) + int(node_2.query(query))
|
|
|
|
expected_rows = 4
|
|
for _ in range(20):
|
|
if expected_rows == get_count():
|
|
break
|
|
time.sleep(1)
|
|
assert expected_rows == get_count()
|
|
|
|
for instance in [node, node_2]:
|
|
instance.restart_clickhouse()
|
|
|
|
time.sleep(10)
|
|
expected_rows = 4
|
|
for _ in range(20):
|
|
if expected_rows == get_count():
|
|
break
|
|
time.sleep(1)
|
|
assert expected_rows == get_count()
|
|
|
|
|
|
def test_upgrade(started_cluster):
|
|
node = started_cluster.instances["old_instance"]
|
|
|
|
table_name = f"test_upgrade"
|
|
dst_table_name = f"{table_name}_dst"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
files_path = f"{table_name}_data"
|
|
files_to_generate = 10
|
|
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
table_name,
|
|
"ordered",
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
},
|
|
)
|
|
total_values = generate_random_files(
|
|
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
|
|
)
|
|
|
|
create_mv(node, table_name, dst_table_name)
|
|
|
|
def get_count():
|
|
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
|
|
|
|
expected_rows = 10
|
|
for _ in range(20):
|
|
if expected_rows == get_count():
|
|
break
|
|
time.sleep(1)
|
|
|
|
assert expected_rows == get_count()
|
|
|
|
node.restart_with_latest_version()
|
|
|
|
assert expected_rows == get_count()
|
|
|
|
|
|
def test_exception_during_insert(started_cluster):
|
|
node = started_cluster.instances["instance_too_many_parts"]
|
|
|
|
table_name = f"test_exception_during_insert"
|
|
dst_table_name = f"{table_name}_dst"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
files_path = f"{table_name}_data"
|
|
files_to_generate = 10
|
|
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
table_name,
|
|
"unordered",
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
},
|
|
)
|
|
total_values = generate_random_files(
|
|
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
|
|
)
|
|
|
|
create_mv(node, table_name, dst_table_name)
|
|
|
|
node.wait_for_log_line(
|
|
"Failed to process data: Code: 252. DB::Exception: Too many parts"
|
|
)
|
|
|
|
time.sleep(2)
|
|
exception = node.query(
|
|
f"SELECT exception FROM system.s3queue WHERE zookeeper_path ilike '%{table_name}%' and notEmpty(exception)"
|
|
)
|
|
assert "Too many parts" in exception
|
|
|
|
node.replace_in_config(
|
|
"/etc/clickhouse-server/config.d/merge_tree.xml",
|
|
"parts_to_throw_insert>0",
|
|
"parts_to_throw_insert>10",
|
|
)
|
|
node.restart_clickhouse()
|
|
|
|
def get_count():
|
|
return int(node.query(f"SELECT count() FROM {dst_table_name}"))
|
|
|
|
expected_rows = 10
|
|
for _ in range(20):
|
|
if expected_rows == get_count():
|
|
break
|
|
time.sleep(1)
|
|
assert expected_rows == get_count()
|
|
|
|
|
|
def test_commit_on_limit(started_cluster):
|
|
node = started_cluster.instances["instance"]
|
|
|
|
table_name = f"test_commit_on_limit"
|
|
dst_table_name = f"{table_name}_dst"
|
|
keeper_path = f"/clickhouse/test_{table_name}"
|
|
files_path = f"{table_name}_data"
|
|
files_to_generate = 10
|
|
|
|
create_table(
|
|
started_cluster,
|
|
node,
|
|
table_name,
|
|
"ordered",
|
|
files_path,
|
|
additional_settings={
|
|
"keeper_path": keeper_path,
|
|
"s3queue_processing_threads_num": 1,
|
|
"s3queue_loading_retries": 0,
|
|
"s3queue_max_processed_files_before_commit": 10,
|
|
},
|
|
)
|
|
total_values = generate_random_files(
|
|
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
|
|
)
|
|
|
|
incorrect_values = [
|
|
["failed", 1, 1],
|
|
]
|
|
incorrect_values_csv = (
|
|
"\n".join((",".join(map(str, row)) for row in incorrect_values)) + "\n"
|
|
).encode()
|
|
|
|
correct_values = [
|
|
[1, 1, 1],
|
|
]
|
|
correct_values_csv = (
|
|
"\n".join((",".join(map(str, row)) for row in correct_values)) + "\n"
|
|
).encode()
|
|
|
|
put_s3_file_content(
|
|
started_cluster, f"{files_path}/test_99.csv", correct_values_csv
|
|
)
|
|
put_s3_file_content(
|
|
started_cluster, f"{files_path}/test_999.csv", correct_values_csv
|
|
)
|
|
put_s3_file_content(
|
|
started_cluster, f"{files_path}/test_9999.csv", incorrect_values_csv
|
|
)
|
|
put_s3_file_content(
|
|
started_cluster, f"{files_path}/test_99999.csv", correct_values_csv
|
|
)
|
|
put_s3_file_content(
|
|
started_cluster, f"{files_path}/test_999999.csv", correct_values_csv
|
|
)
|
|
|
|
create_mv(node, table_name, dst_table_name)
|
|
|
|
def get_processed_files():
|
|
return (
|
|
node.query(
|
|
f"SELECT file_name FROM system.s3queue WHERE zookeeper_path ilike '%{table_name}%' and status = 'Processed' and rows_processed > 0 "
|
|
)
|
|
.strip()
|
|
.split("\n")
|
|
)
|
|
|
|
def get_failed_files():
|
|
return (
|
|
node.query(
|
|
f"SELECT file_name FROM system.s3queue WHERE zookeeper_path ilike '%{table_name}%' and status = 'Failed'"
|
|
)
|
|
.strip()
|
|
.split("\n")
|
|
)
|
|
|
|
for _ in range(30):
|
|
if "test_999999.csv" in get_processed_files():
|
|
break
|
|
time.sleep(1)
|
|
|
|
assert "test_999999.csv" in get_processed_files()
|
|
|
|
assert 1 == int(
|
|
node.query(
|
|
"SELECT value FROM system.events WHERE name = 'ObjectStorageQueueFailedFiles' SETTINGS system_events_show_zero_values=1"
|
|
)
|
|
)
|
|
|
|
expected_processed = ["test_" + str(i) + ".csv" for i in range(files_to_generate)]
|
|
processed = get_processed_files()
|
|
for value in expected_processed:
|
|
assert value in processed
|
|
|
|
expected_failed = ["test_9999.csv"]
|
|
failed = get_failed_files()
|
|
for value in expected_failed:
|
|
assert value not in processed
|
|
assert value in failed
|