Finished the test draft for ATTACH PARTITION,

Extracted the part data corruption function into the helper.
This commit is contained in:
Mike Kot 2021-03-01 16:42:31 +03:00
parent 2b3b335eda
commit 5281314ac0
4 changed files with 103 additions and 32 deletions

View File

@ -0,0 +1,7 @@
def corrupt_part_data_on_disk(node, table, part_name, is_detached=False):
parts_table = "system.detached_parts" is is_detached else "system.parts"
part_path = node.query(
"SELECT path FROM " + parts_table + " WHERE table = '{}' and name = '{}'".format(table, part_name)).strip()
node.exec_in_container(['bash', '-c',
'cd {p} && ls *.bin | head -n 1 | xargs -I{{}} sh -c \'echo "1" >> $1\' -- {{}}'.format(
p=part_path)], privileged=True)

View File

@ -0,0 +1,21 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node_1_1</host>
<port>9000</port>
</replica>
<replica>
<host>node_1_2</host>
<port>9000</port>
</replica>
<replica>
<host>node_1_3</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -1,49 +1,98 @@
import time
import pytest import pytest
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseKiller
from helpers.test_tools import assert_eq_with_retry
from helpers.network import PartitionManager
from helpers.corrupt_part_data_on_disk import corrupt_part_data_on_disk
def fill_node(node):
node.query(
'''
CREATE TABLE test(n UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test', '{replica}')
ORDER BY n PARTITION BY n % 10;
'''.format(replica=node.name))
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
configs =["configs/remote_servers.xml"]
node1 = cluster.add_instance('node1') node_1 = cluster.add_instance('replica1', with_zookeeper=True, main_configs=configs)
node_2 = cluster.add_instance('replica2', with_zookeeper=True, main_configs=configs)
node_3 = cluster.add_instance('replica3', with_zookeeper=True, main_configs=configs)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def start_cluster(): def start_cluster():
try: try:
cluster.start() cluster.start()
fill_node(node_1)
fill_node(node_2)
# the third node is filled after the DETACH query
yield cluster yield cluster
except Exception as ex:
print(ex)
finally: finally:
cluster.shutdown() cluster.shutdown()
def check_data(nodes, detached_parts):
for node in nodes:
for i in range(10):
assert node.query("SELECT count() FROM test WHERE key % 10 == " + str(i)) ==
"0\n" if i in detached_parts else "10\n"
# Check that ALTER TABLE ATTACH PARTITION does not fetch data from other replicas if it's present in the assert node.query("SELECT count() FROM system.parts WHERE table='test'") ==
# detached/ folder str(10 - len(detached_parts)) + "\n"
assert node.query("SELECT count() FROM system.detached_parts WHERE table='test'") ==
str(len(detached_parts)) + "\n"
# 1. Check that ALTER TABLE ATTACH PARTITION does not fetch data from other replicas if it's present in the
# detached/ folder.
# 2. Check that ALTER TABLE ATTACH PARTITION downloads the data from other replicas if the detached/ folder
# does not contain the part with the correct checksums.
def test_attach_without_fetching(start_cluster): def test_attach_without_fetching(start_cluster):
node1.query( # 0. Insert data on two replicas
"CREATE TABLE test (date Date, key Int32, value String) Engine=MergeTree ORDER BY key PARTITION by date") node_1.query("INSERT INTO test SELECT * FROM numbers(100)")
node1.query("INSERT INTO test SELECT toDate('2019-10-01'), number, toString(number) FROM numbers(100)") check_data([node_1, node_2], detached_parts=[])
assert node1.query("SELECT COUNT() FROM test WHERE key % 10 == 0") == "10\n" # 1. Detach the first three partition on the replicas
node1.query("ALTER TABLE test DETACH PARTITION '2019-10-01'") # This part will be fetched from other replicas as it would be missing in the detached/ folder
node_1.query("ALTER TABLE test DETACH PARTITION '0_0_0_0'")
# This part will be fetched from other replicas as the checksums won't match (we'll manually break the data).
node_1.query("ALTER TABLE test DETACH PARTITION '1_0_0_0'")
# This part will be copied locally and attached without fetch
node_1.query("ALTER TABLE test DETACH PARTITION '2_0_0_0'")
assert node1.query("SELECT COUNT() FROM test WHERE key % 10 == 0") == "0\n" check_data([node_1, node_2], detached_parts=[0, 1, 2])
assert node1.query("SELECT COUNT() FROM test") == "0\n"
# Break the network in the partition manager # 2. Create the third replica
# The data is not removed from detached/ so it's ok fill_node(node_3)
# to be sure output not empty # 3. Attach the first partition and check if it has been fetched correctly
node1.exec_in_container( node_3.query("ALTER TABLE test ATTACH PARTITION '0_0_0_0'")
['bash', '-c', 'find /var/lib/clickhouse/data/default/test/detached -name "checksums.txt" | grep -e ".*" '], check_data([node_1, node_2, node_3], detached_parts=[1, 2])
privileged=True, user='root')
node1.exec_in_container( # 4. Fetch the second partition to the third replica, break the data to corrupt the checksums,
['bash', '-c', 'find /var/lib/clickhouse/data/default/test/detached -name "checksums.txt" -delete'], # attach it and check if it also was fetched correctly.
privileged=True, user='root') node_3.query("ALTER TABLE test FETCH PARTITION '1_0_0_0' FROM '/clickhouse/tables/test'")
corrupt_part_data_on_disk(node_3, 'test', '1_0_0_0', is_detached=True)
node_3.query("ALTER TABLE test ATTACH PARTITION '1_0_0_0'")
node1.query("ALTER TABLE test ATTACH PARTITION '2019-10-01'") check_data([node_1, node_2, node_3], detached_parts=[2])
assert node1.query("SELECT COUNT() FROM test WHERE key % 10 == 0") == "10\n" # 5. Fetch the third partition to the third replica, break the network as so the replica won't be able to
assert node1.query("SELECT COUNT() FROM test") == "100\n" # download the data, attach the partition (and check it has been attached from the local data)
node_3.query("ALTER TABLE test FETCH PARTITION '2_0_0_0' FROM '/clickhouse/tables/test'")
with PartitionManager() as pm:
pm.partition_instances(node_1, node_3)
pm.partition_instances(node_2, node_3)
node_3.query("ALTER TABLE test ATTACH PARTITION '2_0_0_0'")
check_data([node_1, node_2, node_3], detached_parts=[])

View File

@ -3,6 +3,7 @@ import pytest
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from multiprocessing.dummy import Pool from multiprocessing.dummy import Pool
from helpers.network import PartitionManager from helpers.network import PartitionManager
from helpers.corrupt_part_data_on_disk import corrupt_part_data_on_disk
import time import time
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
@ -25,13 +26,6 @@ def started_cluster():
finally: finally:
cluster.shutdown() cluster.shutdown()
def corrupt_data_part_on_disk(node, table, part_name):
part_path = node.query(
"SELECT path FROM system.parts WHERE table = '{}' and name = '{}'".format(table, part_name)).strip()
node.exec_in_container(['bash', '-c',
'cd {p} && ls *.bin | head -n 1 | xargs -I{{}} sh -c \'echo "1" >> $1\' -- {{}}'.format(
p=part_path)], privileged=True)
def test_merge_and_part_corruption(started_cluster): def test_merge_and_part_corruption(started_cluster):
node1.query("SYSTEM STOP REPLICATION QUEUES replicated_mt") node1.query("SYSTEM STOP REPLICATION QUEUES replicated_mt")
@ -43,7 +37,7 @@ def test_merge_and_part_corruption(started_cluster):
# Need to corrupt "border part" (left or right). If we will corrupt something in the middle # Need to corrupt "border part" (left or right). If we will corrupt something in the middle
# clickhouse will not consider merge as broken, because we have parts with the same min and max # clickhouse will not consider merge as broken, because we have parts with the same min and max
# block numbers. # block numbers.
corrupt_data_part_on_disk(node1, 'replicated_mt', 'all_3_3_0') corrupt_part_data_on_disk(node1, 'replicated_mt', 'all_3_3_0')
with Pool(1) as p: with Pool(1) as p:
def optimize_with_delay(x): def optimize_with_delay(x):