ClickHouse/tests/integration/test_replicated_merge_tree_s3/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

173 lines
5.1 KiB
Python
Raw Normal View History

import logging
import random
import string
import pytest
from helpers.cluster import ClickHouseCluster
2022-08-24 13:01:21 +00:00
TABLE_NAME = "s3_test"
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node1",
main_configs=["configs/config.d/storage_conf.xml"],
macros={"replica": "1"},
with_minio=True,
with_zookeeper=True,
)
cluster.add_instance(
"node2",
main_configs=["configs/config.d/storage_conf.xml"],
macros={"replica": "2"},
with_zookeeper=True,
)
cluster.add_instance(
"node3",
main_configs=["configs/config.d/storage_conf.xml"],
macros={"replica": "3"},
with_zookeeper=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
FILES_OVERHEAD = 1
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC = 1
FILES_OVERHEAD_METADATA_VERSION = 1
FILES_OVERHEAD_PER_PART_WIDE = (
FILES_OVERHEAD_PER_COLUMN * 3
+ 2
+ 6
+ FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC
+ FILES_OVERHEAD_METADATA_VERSION
)
FILES_OVERHEAD_PER_PART_COMPACT = (
10 + FILES_OVERHEAD_DEFAULT_COMPRESSION_CODEC + FILES_OVERHEAD_METADATA_VERSION
)
def random_string(length):
letters = string.ascii_letters
return "".join(random.choice(letters) for i in range(length))
def generate_values(date_str, count, sign=1):
data = [[date_str, sign * (i + 1), random_string(10)] for i in range(count)]
data.sort(key=lambda tup: tup[1])
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
2020-09-10 18:43:02 +00:00
def create_table(cluster, additional_settings=None):
2022-08-24 13:01:21 +00:00
settings = {
"storage_policy": "s3",
}
settings.update(additional_settings)
create_table_statement = f"""
CREATE TABLE {TABLE_NAME} ON CLUSTER cluster(
dt Date,
id Int64,
data String,
INDEX min_max (id) TYPE minmax GRANULARITY 3
2020-09-21 21:09:50 +00:00
) ENGINE=ReplicatedMergeTree()
PARTITION BY dt
ORDER BY (dt, id)
2022-08-24 13:01:21 +00:00
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
"""
2020-10-02 16:54:07 +00:00
list(cluster.instances.values())[0].query(create_table_statement)
2022-08-24 13:01:21 +00:00
def insert(cluster, node_idxs, verify=True):
all_values = ""
for node_idx in node_idxs:
node = cluster.instances["node" + str(node_idx)]
values = generate_values("2020-01-0" + str(node_idx), 4096)
node.query(
f"INSERT INTO {TABLE_NAME} VALUES {values}",
settings={"insert_quorum": 3},
)
if node_idx != 1:
all_values += ","
all_values += values
if verify:
for node_idx in node_idxs:
node = cluster.instances["node" + str(node_idx)]
assert (
node.query(
f"SELECT * FROM {TABLE_NAME} order by dt, id FORMAT Values",
settings={"select_sequential_consistency": 1},
)
== all_values
)
@pytest.fixture(autouse=True)
def drop_table(cluster):
yield
2020-10-02 16:54:07 +00:00
for node in list(cluster.instances.values()):
2022-08-24 13:01:21 +00:00
node.query(f"DROP TABLE IF EXISTS {TABLE_NAME}")
minio = cluster.minio_client
# Remove extra objects to prevent tests cascade failing
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
minio.remove_object(cluster.minio_bucket, obj.object_name)
2020-09-10 18:43:02 +00:00
@pytest.mark.parametrize(
"min_rows_for_wide_part,files_per_part",
[(0, FILES_OVERHEAD_PER_PART_WIDE), (8192, FILES_OVERHEAD_PER_PART_COMPACT)],
2020-09-10 18:43:02 +00:00
)
def test_insert_select_replicated(cluster, min_rows_for_wide_part, files_per_part):
create_table(
cluster,
2022-08-25 10:10:46 +00:00
additional_settings={"min_rows_for_wide_part": min_rows_for_wide_part},
)
2022-08-25 21:01:31 +00:00
insert(cluster, node_idxs=[1, 2, 3], verify=True)
minio = cluster.minio_client
assert len(
list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
) == 3 * (FILES_OVERHEAD + files_per_part * 3)
2022-08-24 13:01:21 +00:00
def test_drop_cache_on_cluster(cluster):
create_table(
cluster,
2022-08-24 22:20:50 +00:00
additional_settings={"storage_policy": "s3_cache"},
2022-08-24 13:01:21 +00:00
)
insert(cluster, node_idxs=[1, 2, 3], verify=True)
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]
2022-08-24 13:03:02 +00:00
node3 = cluster.instances["node3"]
2022-08-24 13:01:21 +00:00
2022-08-24 22:20:50 +00:00
node1.query(
f"select * from clusterAllReplicas(cluster, default, {TABLE_NAME}) format Null"
)
2022-08-24 13:01:21 +00:00
assert int(node1.query("select count() from system.filesystem_cache")) > 0
assert int(node2.query("select count() from system.filesystem_cache")) > 0
assert int(node3.query("select count() from system.filesystem_cache")) > 0
node1.query("system drop filesystem cache on cluster cluster")
assert int(node1.query("select count() from system.filesystem_cache")) == 0
assert int(node2.query("select count() from system.filesystem_cache")) == 0
assert int(node3.query("select count() from system.filesystem_cache")) == 0