ClickHouse/tests/integration/test_replicated_merge_tree_s3/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

163 lines
4.9 KiB
Python
Raw Normal View History

import logging
import random
import string
import pytest
from helpers.cluster import ClickHouseCluster
2022-08-24 13:01:21 +00:00
TABLE_NAME = "s3_test"
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node1",
main_configs=["configs/config.d/storage_conf.xml"],
macros={"replica": "1"},
with_minio=True,
with_zookeeper=True,
)
cluster.add_instance(
"node2",
main_configs=["configs/config.d/storage_conf.xml"],
macros={"replica": "2"},
with_zookeeper=True,
)
cluster.add_instance(
"node3",
main_configs=["configs/config.d/storage_conf.xml"],
macros={"replica": "3"},
with_zookeeper=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
FILES_OVERHEAD = 1
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
2020-09-10 18:43:02 +00:00
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
def random_string(length):
letters = string.ascii_letters
return "".join(random.choice(letters) for i in range(length))
def generate_values(date_str, count, sign=1):
data = [[date_str, sign * (i + 1), random_string(10)] for i in range(count)]
data.sort(key=lambda tup: tup[1])
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
2020-09-10 18:43:02 +00:00
def create_table(cluster, additional_settings=None):
2022-08-24 13:01:21 +00:00
settings = {
"storage_policy": "s3",
}
settings.update(additional_settings)
create_table_statement = f"""
CREATE TABLE {TABLE_NAME} ON CLUSTER cluster(
dt Date,
id Int64,
data String,
INDEX min_max (id) TYPE minmax GRANULARITY 3
2020-09-21 21:09:50 +00:00
) ENGINE=ReplicatedMergeTree()
PARTITION BY dt
ORDER BY (dt, id)
2022-08-24 13:01:21 +00:00
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
"""
2020-10-02 16:54:07 +00:00
list(cluster.instances.values())[0].query(create_table_statement)
2022-08-24 13:01:21 +00:00
def insert(cluster, node_idxs, verify=True):
all_values = ""
for node_idx in node_idxs:
node = cluster.instances["node" + str(node_idx)]
values = generate_values("2020-01-0" + str(node_idx), 4096)
node.query(
f"INSERT INTO {TABLE_NAME} VALUES {values}",
settings={"insert_quorum": 3},
)
if node_idx != 1:
all_values += ","
all_values += values
if verify:
for node_idx in node_idxs:
node = cluster.instances["node" + str(node_idx)]
assert (
node.query(
f"SELECT * FROM {TABLE_NAME} order by dt, id FORMAT Values",
settings={"select_sequential_consistency": 1},
)
== all_values
)
@pytest.fixture(autouse=True)
def drop_table(cluster):
yield
2020-10-02 16:54:07 +00:00
for node in list(cluster.instances.values()):
2022-08-24 13:01:21 +00:00
node.query(f"DROP TABLE IF EXISTS {TABLE_NAME}")
minio = cluster.minio_client
# Remove extra objects to prevent tests cascade failing
2022-09-05 00:13:51 +00:00
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
minio.remove_object(cluster.minio_bucket, obj.object_name)
2020-09-10 18:43:02 +00:00
@pytest.mark.parametrize(
"min_rows_for_wide_part,files_per_part",
[(0, FILES_OVERHEAD_PER_PART_WIDE), (8192, FILES_OVERHEAD_PER_PART_COMPACT)],
2020-09-10 18:43:02 +00:00
)
def test_insert_select_replicated(cluster, min_rows_for_wide_part, files_per_part):
create_table(
cluster,
2022-08-25 10:10:46 +00:00
additional_settings={"min_rows_for_wide_part": min_rows_for_wide_part},
)
2022-08-25 21:01:31 +00:00
insert(cluster, node_idxs=[1, 2, 3], verify=True)
minio = cluster.minio_client
2022-09-05 00:23:36 +00:00
assert len(
list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
) == 3 * (FILES_OVERHEAD + files_per_part * 3)
2022-08-24 13:01:21 +00:00
def test_drop_cache_on_cluster(cluster):
create_table(
cluster,
2022-08-24 22:20:50 +00:00
additional_settings={"storage_policy": "s3_cache"},
2022-08-24 13:01:21 +00:00
)
insert(cluster, node_idxs=[1, 2, 3], verify=True)
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
2022-08-24 13:03:02 +00:00
node3 = cluster.instances["node3"]
2022-08-24 13:01:21 +00:00
2022-08-24 22:20:50 +00:00
node1.query(
f"select * from clusterAllReplicas(cluster, default, {TABLE_NAME}) format Null"
)
2022-08-24 13:01:21 +00:00
assert int(node1.query("select count() from system.filesystem_cache")) > 0
assert int(node2.query("select count() from system.filesystem_cache")) > 0
assert int(node3.query("select count() from system.filesystem_cache")) > 0
node1.query("system drop filesystem cache on cluster cluster")
assert int(node1.query("select count() from system.filesystem_cache")) == 0
assert int(node2.query("select count() from system.filesystem_cache")) == 0
assert int(node3.query("select count() from system.filesystem_cache")) == 0