mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 03:22:14 +00:00
168 lines
6.0 KiB
Python
168 lines
6.0 KiB
Python
# pylint: disable=line-too-long
|
|
# pylint: disable=unused-argument
|
|
# pylint: disable=redefined-outer-name:
|
|
|
|
import time
|
|
import pytest
|
|
|
|
from helpers.cluster import ClickHouseCluster
|
|
from helpers.network import PartitionManager
|
|
|
|
from helpers.test_tools import assert_eq_with_retry
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
|
|
def _fill_nodes(nodes, shard):
|
|
for node in nodes:
|
|
node.query(
|
|
'''
|
|
CREATE DATABASE test;
|
|
|
|
CREATE TABLE real_table(date Date, id UInt32, dummy UInt32)
|
|
ENGINE = MergeTree(date, id, 8192);
|
|
|
|
CREATE TABLE other_table(date Date, id UInt32, dummy UInt32)
|
|
ENGINE = MergeTree(date, id, 8192);
|
|
|
|
CREATE TABLE test_table(date Date, id UInt32, dummy UInt32)
|
|
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}', date, id, 8192);
|
|
'''.format(shard=shard, replica=node.name))
|
|
|
|
|
|
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
|
|
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def normal_work():
|
|
try:
|
|
cluster.start()
|
|
|
|
_fill_nodes([node1, node2], 1)
|
|
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_normal_work(normal_work):
|
|
node1.query("insert into test_table values ('2017-06-16', 111, 0)")
|
|
node1.query("insert into real_table values ('2017-06-16', 222, 0)")
|
|
|
|
assert_eq_with_retry(node1, "SELECT id FROM test_table order by id", '111')
|
|
assert_eq_with_retry(node1, "SELECT id FROM real_table order by id", '222')
|
|
assert_eq_with_retry(node2, "SELECT id FROM test_table order by id", '111')
|
|
|
|
node1.query("ALTER TABLE test_table REPLACE PARTITION 201706 FROM real_table")
|
|
|
|
assert_eq_with_retry(node1, "SELECT id FROM test_table order by id", '222')
|
|
assert_eq_with_retry(node2, "SELECT id FROM test_table order by id", '222')
|
|
|
|
|
|
node3 = cluster.add_instance('node3', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
|
|
node4 = cluster.add_instance('node4', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def drop_failover():
|
|
try:
|
|
cluster.start()
|
|
|
|
_fill_nodes([node3, node4], 2)
|
|
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_drop_failover(drop_failover):
|
|
node3.query("insert into test_table values ('2017-06-16', 111, 0)")
|
|
node3.query("insert into real_table values ('2017-06-16', 222, 0)")
|
|
|
|
assert_eq_with_retry(node3, "SELECT id FROM test_table order by id", '111')
|
|
assert_eq_with_retry(node3, "SELECT id FROM real_table order by id", '222')
|
|
assert_eq_with_retry(node4, "SELECT id FROM test_table order by id", '111')
|
|
|
|
with PartitionManager() as pm:
|
|
# Hinder replication between replicas
|
|
pm.partition_instances(node3, node4, port=9009)
|
|
# Disconnect Node4 from zookeper
|
|
pm.drop_instance_zk_connections(node4)
|
|
|
|
node3.query("ALTER TABLE test_table REPLACE PARTITION 201706 FROM real_table")
|
|
|
|
# Node3 replace is ok
|
|
assert_eq_with_retry(node3, "SELECT id FROM test_table order by id", '222')
|
|
# Network interrupted -- replace is not ok, but it's ok
|
|
assert_eq_with_retry(node4, "SELECT id FROM test_table order by id", '111')
|
|
|
|
# Drop partition on source node
|
|
node3.query("ALTER TABLE test_table DROP PARTITION 201706")
|
|
|
|
# Wait few seconds for connection to zookeeper to be restored
|
|
time.sleep(5)
|
|
|
|
msg = node4.query_with_retry(
|
|
"select last_exception from system.replication_queue where type = 'REPLACE_RANGE'",
|
|
check_callback=lambda x: 'Not found part' not in x, sleep_time=1)
|
|
assert 'Not found part' not in msg
|
|
assert_eq_with_retry(node4, "SELECT id FROM test_table order by id", '')
|
|
|
|
|
|
node5 = cluster.add_instance('node5', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
|
|
node6 = cluster.add_instance('node6', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def replace_after_replace_failover():
|
|
try:
|
|
cluster.start()
|
|
|
|
_fill_nodes([node5, node6], 3)
|
|
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_replace_after_replace_failover(replace_after_replace_failover):
|
|
node5.query("insert into test_table values ('2017-06-16', 111, 0)")
|
|
node5.query("insert into real_table values ('2017-06-16', 222, 0)")
|
|
node5.query("insert into other_table values ('2017-06-16', 333, 0)")
|
|
|
|
assert_eq_with_retry(node5, "SELECT id FROM test_table order by id", '111')
|
|
assert_eq_with_retry(node5, "SELECT id FROM real_table order by id", '222')
|
|
assert_eq_with_retry(node5, "SELECT id FROM other_table order by id", '333')
|
|
assert_eq_with_retry(node6, "SELECT id FROM test_table order by id", '111')
|
|
|
|
with PartitionManager() as pm:
|
|
# Hinder replication between replicas
|
|
pm.partition_instances(node5, node6, port=9009)
|
|
# Disconnect Node6 from zookeper
|
|
pm.drop_instance_zk_connections(node6)
|
|
|
|
node5.query("ALTER TABLE test_table REPLACE PARTITION 201706 FROM real_table")
|
|
|
|
# Node5 replace is ok
|
|
assert_eq_with_retry(node5, "SELECT id FROM test_table order by id", '222')
|
|
# Network interrupted -- replace is not ok, but it's ok
|
|
assert_eq_with_retry(node6, "SELECT id FROM test_table order by id", '111')
|
|
|
|
# Replace partition on source node
|
|
node5.query("ALTER TABLE test_table REPLACE PARTITION 201706 FROM other_table")
|
|
|
|
assert_eq_with_retry(node5, "SELECT id FROM test_table order by id", '333')
|
|
|
|
# Wait few seconds for connection to zookeeper to be restored
|
|
time.sleep(5)
|
|
|
|
msg = node6.query_with_retry(
|
|
"select last_exception from system.replication_queue where type = 'REPLACE_RANGE'",
|
|
check_callback=lambda x: 'Not found part' not in x, sleep_time=1)
|
|
assert 'Not found part' not in msg
|
|
assert_eq_with_retry(node6, "SELECT id FROM test_table order by id", '333')
|