mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 18:12:02 +00:00
163 lines
4.9 KiB
Python
163 lines
4.9 KiB
Python
import logging
|
|
import random
|
|
import string
|
|
|
|
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
TABLE_NAME = "s3_test"
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def cluster():
|
|
try:
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
cluster.add_instance(
|
|
"node1",
|
|
main_configs=["configs/config.d/storage_conf.xml"],
|
|
macros={"replica": "1"},
|
|
with_minio=True,
|
|
with_zookeeper=True,
|
|
)
|
|
cluster.add_instance(
|
|
"node2",
|
|
main_configs=["configs/config.d/storage_conf.xml"],
|
|
macros={"replica": "2"},
|
|
with_zookeeper=True,
|
|
)
|
|
cluster.add_instance(
|
|
"node3",
|
|
main_configs=["configs/config.d/storage_conf.xml"],
|
|
macros={"replica": "3"},
|
|
with_zookeeper=True,
|
|
)
|
|
|
|
logging.info("Starting cluster...")
|
|
cluster.start()
|
|
logging.info("Cluster started")
|
|
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
FILES_OVERHEAD = 1
|
|
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
|
|
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
|
|
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
|
|
|
|
|
|
def random_string(length):
|
|
letters = string.ascii_letters
|
|
return "".join(random.choice(letters) for i in range(length))
|
|
|
|
|
|
def generate_values(date_str, count, sign=1):
|
|
data = [[date_str, sign * (i + 1), random_string(10)] for i in range(count)]
|
|
data.sort(key=lambda tup: tup[1])
|
|
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
|
|
|
|
|
|
def create_table(cluster, additional_settings=None):
|
|
settings = {
|
|
"storage_policy": "s3",
|
|
}
|
|
settings.update(additional_settings)
|
|
|
|
create_table_statement = f"""
|
|
CREATE TABLE {TABLE_NAME} ON CLUSTER cluster(
|
|
dt Date,
|
|
id Int64,
|
|
data String,
|
|
INDEX min_max (id) TYPE minmax GRANULARITY 3
|
|
) ENGINE=ReplicatedMergeTree()
|
|
PARTITION BY dt
|
|
ORDER BY (dt, id)
|
|
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
|
|
"""
|
|
|
|
list(cluster.instances.values())[0].query(create_table_statement)
|
|
|
|
|
|
def insert(cluster, node_idxs, verify=True):
|
|
all_values = ""
|
|
for node_idx in node_idxs:
|
|
node = cluster.instances["node" + str(node_idx)]
|
|
values = generate_values("2020-01-0" + str(node_idx), 4096)
|
|
node.query(
|
|
f"INSERT INTO {TABLE_NAME} VALUES {values}",
|
|
settings={"insert_quorum": 3},
|
|
)
|
|
if node_idx != 1:
|
|
all_values += ","
|
|
all_values += values
|
|
|
|
if verify:
|
|
for node_idx in node_idxs:
|
|
node = cluster.instances["node" + str(node_idx)]
|
|
assert (
|
|
node.query(
|
|
f"SELECT * FROM {TABLE_NAME} order by dt, id FORMAT Values",
|
|
settings={"select_sequential_consistency": 1},
|
|
)
|
|
== all_values
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def drop_table(cluster):
|
|
yield
|
|
for node in list(cluster.instances.values()):
|
|
node.query(f"DROP TABLE IF EXISTS {TABLE_NAME}")
|
|
|
|
minio = cluster.minio_client
|
|
# Remove extra objects to prevent tests cascade failing
|
|
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
|
|
minio.remove_object(cluster.minio_bucket, obj.object_name)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"min_rows_for_wide_part,files_per_part",
|
|
[(0, FILES_OVERHEAD_PER_PART_WIDE), (8192, FILES_OVERHEAD_PER_PART_COMPACT)],
|
|
)
|
|
def test_insert_select_replicated(cluster, min_rows_for_wide_part, files_per_part):
|
|
create_table(
|
|
cluster,
|
|
additional_settings={"min_rows_for_wide_part": min_rows_for_wide_part},
|
|
)
|
|
|
|
insert(cluster, node_idxs=[1, 2, 3], verify=True)
|
|
|
|
minio = cluster.minio_client
|
|
assert len(
|
|
list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
|
|
) == 3 * (FILES_OVERHEAD + files_per_part * 3)
|
|
|
|
|
|
def test_drop_cache_on_cluster(cluster):
|
|
create_table(
|
|
cluster,
|
|
additional_settings={"storage_policy": "s3_cache"},
|
|
)
|
|
|
|
insert(cluster, node_idxs=[1, 2, 3], verify=True)
|
|
|
|
node1 = cluster.instances["node1"]
|
|
node2 = cluster.instances["node2"]
|
|
node3 = cluster.instances["node3"]
|
|
|
|
node1.query(
|
|
f"select * from clusterAllReplicas(cluster, default, {TABLE_NAME}) format Null"
|
|
)
|
|
|
|
assert int(node1.query("select count() from system.filesystem_cache")) > 0
|
|
assert int(node2.query("select count() from system.filesystem_cache")) > 0
|
|
assert int(node3.query("select count() from system.filesystem_cache")) > 0
|
|
|
|
node1.query("system drop filesystem cache on cluster cluster")
|
|
|
|
assert int(node1.query("select count() from system.filesystem_cache")) == 0
|
|
assert int(node2.query("select count() from system.filesystem_cache")) == 0
|
|
assert int(node3.query("select count() from system.filesystem_cache")) == 0
|