ClickHouse/tests/integration/test_replace_partition/test.py

187 lines
6.2 KiB
Python
Raw Normal View History

# pylint: disable=line-too-long
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name:
import time
2024-09-27 10:19:39 +00:00
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
def _fill_nodes(nodes, shard):
for node in nodes:
node.query(
"""
CREATE DATABASE test;
CREATE TABLE real_table(date Date, id UInt32, dummy UInt32)
2022-06-23 08:37:52 +00:00
ENGINE = MergeTree PARTITION BY toYYYYMM(date) ORDER BY id;
CREATE TABLE other_table(date Date, id UInt32, dummy UInt32)
2022-06-23 08:37:52 +00:00
ENGINE = MergeTree PARTITION BY toYYYYMM(date) ORDER BY id;
CREATE TABLE test_table(date Date, id UInt32, dummy UInt32)
2022-06-23 08:37:52 +00:00
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}') PARTITION BY toYYYYMM(date) ORDER BY id;
""".format(
shard=shard, replica=node.name
)
)
node1 = cluster.add_instance(
"node1", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node2 = cluster.add_instance(
"node2", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
@pytest.fixture(scope="module")
def normal_work():
try:
cluster.start()
_fill_nodes([node1, node2], 1)
yield cluster
finally:
cluster.shutdown()
def test_normal_work(normal_work):
node1.query("insert into test_table values ('2017-06-16', 111, 0)")
node1.query("insert into real_table values ('2017-06-16', 222, 0)")
assert_eq_with_retry(node1, "SELECT id FROM test_table order by id", "111")
assert_eq_with_retry(node1, "SELECT id FROM real_table order by id", "222")
assert_eq_with_retry(node2, "SELECT id FROM test_table order by id", "111")
node1.query("ALTER TABLE test_table REPLACE PARTITION 201706 FROM real_table")
assert_eq_with_retry(node1, "SELECT id FROM test_table order by id", "222")
assert_eq_with_retry(node2, "SELECT id FROM test_table order by id", "222")
node3 = cluster.add_instance(
"node3", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node4 = cluster.add_instance(
"node4", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
@pytest.fixture(scope="module")
def drop_failover():
try:
cluster.start()
_fill_nodes([node3, node4], 2)
yield cluster
finally:
cluster.shutdown()
def test_drop_failover(drop_failover):
node3.query("insert into test_table values ('2017-06-16', 111, 0)")
node3.query("insert into real_table values ('2017-06-16', 222, 0)")
assert_eq_with_retry(node3, "SELECT id FROM test_table order by id", "111")
assert_eq_with_retry(node3, "SELECT id FROM real_table order by id", "222")
assert_eq_with_retry(node4, "SELECT id FROM test_table order by id", "111")
with PartitionManager() as pm:
# Hinder replication between replicas
pm.partition_instances(node3, node4, port=9009)
2018-08-06 13:18:52 +00:00
# Disconnect Node4 from zookeper
pm.drop_instance_zk_connections(node4)
node3.query("ALTER TABLE test_table REPLACE PARTITION 201706 FROM real_table")
# Node3 replace is ok
assert_eq_with_retry(node3, "SELECT id FROM test_table order by id", "222")
# Network interrupted -- replace is not ok, but it's ok
assert_eq_with_retry(node4, "SELECT id FROM test_table order by id", "111")
# Drop partition on source node
node3.query("ALTER TABLE test_table DROP PARTITION 201706")
# Wait few seconds for connection to zookeeper to be restored
time.sleep(5)
msg = node4.query_with_retry(
"select last_exception from system.replication_queue where type = 'REPLACE_RANGE'",
check_callback=lambda x: "Not found part" not in x,
sleep_time=1,
)
assert "Not found part" not in msg
assert_eq_with_retry(node4, "SELECT id FROM test_table order by id", "")
node5 = cluster.add_instance(
"node5", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
node6 = cluster.add_instance(
"node6", main_configs=["configs/remote_servers.xml"], with_zookeeper=True
)
@pytest.fixture(scope="module")
def replace_after_replace_failover():
try:
cluster.start()
_fill_nodes([node5, node6], 3)
yield cluster
finally:
cluster.shutdown()
def test_replace_after_replace_failover(replace_after_replace_failover):
node5.query("insert into test_table values ('2017-06-16', 111, 0)")
node5.query("insert into real_table values ('2017-06-16', 222, 0)")
node5.query("insert into other_table values ('2017-06-16', 333, 0)")
assert_eq_with_retry(node5, "SELECT id FROM test_table order by id", "111")
assert_eq_with_retry(node5, "SELECT id FROM real_table order by id", "222")
assert_eq_with_retry(node5, "SELECT id FROM other_table order by id", "333")
assert_eq_with_retry(node6, "SELECT id FROM test_table order by id", "111")
with PartitionManager() as pm:
# Hinder replication between replicas
pm.partition_instances(node5, node6, port=9009)
2018-08-06 13:18:52 +00:00
# Disconnect Node6 from zookeper
pm.drop_instance_zk_connections(node6)
node5.query("ALTER TABLE test_table REPLACE PARTITION 201706 FROM real_table")
# Node5 replace is ok
assert_eq_with_retry(node5, "SELECT id FROM test_table order by id", "222")
# Network interrupted -- replace is not ok, but it's ok
assert_eq_with_retry(node6, "SELECT id FROM test_table order by id", "111")
# Replace partition on source node
node5.query("ALTER TABLE test_table REPLACE PARTITION 201706 FROM other_table")
assert_eq_with_retry(node5, "SELECT id FROM test_table order by id", "333")
# Wait few seconds for connection to zookeeper to be restored
time.sleep(5)
msg = node6.query_with_retry(
"select last_exception from system.replication_queue where type = 'REPLACE_RANGE'",
check_callback=lambda x: "Not found part" not in x,
sleep_time=1,
)
assert "Not found part" not in msg
assert_eq_with_retry(node6, "SELECT id FROM test_table order by id", "333")