S3 zero copy replication: fix tests

This commit is contained in:
Anton Ivashkin 2020-10-23 13:01:40 +03:00
parent 652c56e74e
commit 1ffe0b1d03
6 changed files with 159 additions and 0 deletions

View File

@ -21,6 +21,7 @@
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
<allow_s3_zero_copy_replication>0</allow_s3_zero_copy_replication>
</merge_tree>
<remote_servers>

View File

@ -0,0 +1,50 @@
<yandex>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
<allow_s3_zero_copy_replication>1</allow_s3_zero_copy_replication>
</merge_tree>
<remote_servers>
<cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</cluster>
</remote_servers>
<macros>
<shard>0</shard>
</macros>
</yandex>

View File

@ -0,0 +1,105 @@
import logging
import random
import string
import pytest
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node1", main_configs=["configs/config.d/storage_conf.xml"], macros={'replica': '1'},
with_minio=True, with_zookeeper=True)
cluster.add_instance("node2", main_configs=["configs/config.d/storage_conf.xml"], macros={'replica': '2'},
with_zookeeper=True)
cluster.add_instance("node3", main_configs=["configs/config.d/storage_conf.xml"], macros={'replica': '3'},
with_zookeeper=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
FILES_OVERHEAD = 1
FILES_OVERHEAD_PER_COLUMN = 2 # Data and mark files
FILES_OVERHEAD_PER_PART_WIDE = FILES_OVERHEAD_PER_COLUMN * 3 + 2 + 6 + 1
FILES_OVERHEAD_PER_PART_COMPACT = 10 + 1
def random_string(length):
letters = string.ascii_letters
return ''.join(random.choice(letters) for i in range(length))
def generate_values(date_str, count, sign=1):
data = [[date_str, sign * (i + 1), random_string(10)] for i in range(count)]
data.sort(key=lambda tup: tup[1])
return ",".join(["('{}',{},'{}')".format(x, y, z) for x, y, z in data])
def create_table(cluster, additional_settings=None):
create_table_statement = """
CREATE TABLE s3_test ON CLUSTER cluster(
dt Date,
id Int64,
data String,
INDEX min_max (id) TYPE minmax GRANULARITY 3
) ENGINE=ReplicatedMergeTree()
PARTITION BY dt
ORDER BY (dt, id)
SETTINGS storage_policy='s3'
"""
if additional_settings:
create_table_statement += ","
create_table_statement += additional_settings
list(cluster.instances.values())[0].query(create_table_statement)
@pytest.fixture(autouse=True)
def drop_table(cluster):
yield
for node in list(cluster.instances.values()):
node.query("DROP TABLE IF EXISTS s3_test")
minio = cluster.minio_client
# Remove extra objects to prevent tests cascade failing
for obj in list(minio.list_objects(cluster.minio_bucket, 'data/')):
minio.remove_object(cluster.minio_bucket, obj.object_name)
@pytest.mark.parametrize(
"min_rows_for_wide_part,files_per_part",
[
(0, FILES_OVERHEAD_PER_PART_WIDE),
(8192, FILES_OVERHEAD_PER_PART_COMPACT)
]
)
def test_insert_select_replicated(cluster, min_rows_for_wide_part, files_per_part):
create_table(cluster, additional_settings="min_rows_for_wide_part={}".format(min_rows_for_wide_part))
all_values = ""
for node_idx in range(1, 4):
node = cluster.instances["node" + str(node_idx)]
values = generate_values("2020-01-0" + str(node_idx), 4096)
node.query("INSERT INTO s3_test VALUES {}".format(values), settings={"insert_quorum": 3})
if node_idx != 1:
all_values += ","
all_values += values
for node_idx in range(1, 4):
node = cluster.instances["node" + str(node_idx)]
assert node.query("SELECT * FROM s3_test order by dt, id FORMAT Values",
settings={"select_sequential_consistency": 1}) == all_values
minio = cluster.minio_client
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == (3 * FILES_OVERHEAD) + (files_per_part * 3)

View File

@ -23,6 +23,7 @@
<merge_tree>
<min_bytes_for_wide_part>0</min_bytes_for_wide_part>
<old_parts_lifetime>2</old_parts_lifetime>
<allow_s3_zero_copy_replication>1</allow_s3_zero_copy_replication>
</merge_tree>
<remote_servers>

View File

@ -54,6 +54,7 @@ def test_s3_zero_copy_replication(cluster, policy):
)
node1.query("INSERT INTO s3_test VALUES (0,'data'),(1,'data')")
time.sleep(1)
assert node1.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data')"
assert node2.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data')"
@ -61,6 +62,7 @@ def test_s3_zero_copy_replication(cluster, policy):
assert get_large_objects_count(cluster) == 1
node2.query("INSERT INTO s3_test VALUES (2,'data'),(3,'data')")
time.sleep(1)
assert node2.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
assert node1.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"